use anyhow::{Context, Result};
use log::{error, warn, info};
use std::time::Duration;
use std::sync::{Arc, Mutex};
use chrono::{DateTime, Utc};
use yatws::{
IBKRClient,
contract::{Contract, WhatToShow, BarSize, SecType},
data::{MarketDataType, TickType, GenericTickType, TickByTickRequestType},
data_observer::{MarketDataObserver, RealTimeBarsObserver, TickByTickObserver, MarketDepthObserver,
HistoricalDataObserver, HistoricalTicksObserver},
};
pub(super) fn observe_market_data_impl(client: &IBKRClient, is_live: bool) -> Result<()> {
info!("--- Testing Observe Market Data (request_observe_market_data) ---");
let data_mgr = client.data_market();
let contract = Contract::stock("MSFT");
#[derive(Debug)]
struct TestMarketObserver {
name: String,
error_occurred: std::sync::atomic::AtomicBool,
}
impl MarketDataObserver for TestMarketObserver {
fn on_tick_price(&self, req_id: i32, tick_type: TickType, price: f64, _attrib: yatws::data::TickAttrib) {
info!("[{}] TickPrice: ReqID={}, Type={:?}, Price={}", self.name, req_id, tick_type, price);
}
fn on_tick_size(&self, req_id: i32, tick_type: TickType, size: f64) {
info!("[{}] TickSize: ReqID={}, Type={:?}, Size={}", self.name, req_id, tick_type, size);
}
fn on_tick_string(&self, req_id: i32, tick_type: TickType, value: &str) {
info!("[{}] TickString: ReqID={}, Type={:?}, Value='{}'", self.name, req_id, tick_type, value);
}
fn on_tick_snapshot_end(&self, req_id: i32) {
info!("[{}] TickSnapshotEnd: ReqID={}", self.name, req_id);
}
fn on_market_data_type(&self, req_id: i32, market_data_type: MarketDataType) {
info!("[{}] MarketDataTypeSet: ReqID={}, Type={:?}", self.name, req_id, market_data_type);
}
fn on_error(&self, req_id: i32, error_code: i32, error_message: &str) {
error!("[{}] Error: ReqID={}, Code={}, Msg='{}'", self.name, req_id, error_code, error_message);
self.error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
let observer = TestMarketObserver { name: "MSFT-Observer".to_string(), error_occurred: Default::default() };
let generic_tick_list: &[GenericTickType] = &[];
let snapshot = false; let mkt_data_options = &[];
info!("Requesting observed market data for {} (Generic Ticks: '{}')...",
contract.symbol,
generic_tick_list.iter().map(|t| t.to_string()).collect::<Vec<_>>().join(","));
let (req_id, observer_id) = data_mgr.request_observe_market_data(
&contract,
generic_tick_list,
snapshot,
false, mkt_data_options,
Some(MarketDataType::Delayed),
observer,
).context("Failed to request observed market data")?;
info!("Market data requested with ReqID: {}, ObserverID: {:?}. Waiting for data...", req_id, observer_id);
let wait_duration = if is_live { Duration::from_secs(15) } else { Duration::from_millis(500) }; info!("Waiting for {:?} to capture streaming data...", wait_duration);
std::thread::sleep(wait_duration);
info!("Cancelling market data request (ReqID: {})...", req_id);
data_mgr.cancel_market_data(req_id).context("Failed to cancel market data")?;
info!("Removing market data observer (ObserverID: {:?})...", observer_id);
data_mgr.remove_market_data_observer(observer_id);
info!("Market data request and observer handled.");
let _observer_had_error = client.data_market() ;
Ok(())
}
pub(super) fn observe_realtime_bars_impl(client: &IBKRClient, is_live: bool) -> Result<()> {
info!("--- Testing Observe Real Time Bars ---");
let data_mgr = client.data_market();
let contract = Contract::stock("AAPL");
#[derive(Debug)]
struct TestRealTimeBarsObserver {
name: String,
bar_count: Arc<Mutex<usize>>,
error_occurred: std::sync::atomic::AtomicBool,
}
impl RealTimeBarsObserver for TestRealTimeBarsObserver {
fn on_bar_update(&self, req_id: i32, bar: &yatws::contract::Bar) {
info!("[{}] Bar Update: ReqID={}, Time={}, OHLC={},{},{},{}, Vol={}",
self.name, req_id, bar.time.format("%H:%M:%S"),
bar.open, bar.high, bar.low, bar.close, bar.volume);
{
let mut count = self.bar_count.lock().unwrap();
*count += 1;
}
}
fn on_error(&self, req_id: i32, error_code: i32, error_message: &str) {
error!("[{}] Error: ReqID={}, Code={}, Msg='{}'",
self.name, req_id, error_code, error_message);
self.error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
let bar_count = Arc::new(Mutex::new(0));
let observer = TestRealTimeBarsObserver {
name: "AAPL-RealTimeBars".to_string(),
bar_count: Arc::clone(&bar_count),
error_occurred: Default::default(),
};
info!("Requesting observed real-time bars for {}...", contract.symbol);
let (req_id, observer_id) = data_mgr.request_observe_realtime_bars(
&contract,
WhatToShow::Trades,
true, &[], observer,
).context("Failed to request observed real-time bars")?;
info!("Real-time bars requested with ReqID: {}, ObserverID: {:?}. Waiting for data...",
req_id, observer_id);
let wait_duration = if is_live { Duration::from_secs(20) } else { Duration::from_secs(1) };
info!("Waiting for {:?} to capture streaming bars...", wait_duration);
std::thread::sleep(wait_duration);
info!("Cancelling real-time bars request (ReqID: {})...", req_id);
data_mgr.cancel_real_time_bars(req_id).context("Failed to cancel real-time bars")?;
info!("Removing real-time bars observer (ObserverID: {:?})...", observer_id);
data_mgr.remove_realtime_bars_observer(observer_id);
let final_count = {
let count = bar_count.lock().unwrap();
*count
};
info!("Real-time bars test completed. Received {} bars.", final_count);
if final_count == 0 && is_live {
warn!("Received 0 bars. This might be okay depending on market hours.");
}
Ok(())
}
pub(super) fn observe_tick_by_tick_impl(client: &IBKRClient, is_live: bool) -> Result<()> {
info!("--- Testing Observe Tick By Tick ---");
let data_mgr = client.data_market();
let contract = Contract::stock("GOOG");
#[derive(Debug)]
struct TestTickByTickObserver {
name: String,
tick_count: Arc<Mutex<usize>>,
error_occurred: std::sync::atomic::AtomicBool,
}
impl TickByTickObserver for TestTickByTickObserver {
fn on_tick_by_tick_all_last(&self, req_id: i32, tick_type: i32, time: i64, price: f64, size: f64,
_tick_attrib_last: &yatws::data::TickAttribLast,
_exchange: &str, _special_conditions: &str) {
info!("[{}] Tick Last: ReqID={}, Type={}, Time={}, Price={}, Size={}",
self.name, req_id, tick_type, time, price, size);
{
let mut count = self.tick_count.lock().unwrap();
*count += 1;
}
}
fn on_tick_by_tick_bid_ask(&self, req_id: i32, time: i64, bid_price: f64, ask_price: f64,
bid_size: f64, ask_size: f64,
_tick_attrib_bid_ask: &yatws::data::TickAttribBidAsk) {
info!("[{}] Tick BidAsk: ReqID={}, Time={}, Bid={}x{}, Ask={}x{}",
self.name, req_id, time, bid_price, bid_size, ask_price, ask_size);
{
let mut count = self.tick_count.lock().unwrap();
*count += 1;
}
}
fn on_tick_by_tick_mid_point(&self, req_id: i32, time: i64, mid_point: f64) {
info!("[{}] Tick MidPoint: ReqID={}, Time={}, MidPoint={}",
self.name, req_id, time, mid_point);
{
let mut count = self.tick_count.lock().unwrap();
*count += 1;
}
}
fn on_error(&self, req_id: i32, error_code: i32, error_message: &str) {
error!("[{}] Error: ReqID={}, Code={}, Msg='{}'",
self.name, req_id, error_code, error_message);
self.error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
let tick_count = Arc::new(Mutex::new(0));
let observer = TestTickByTickObserver {
name: "GOOG-TickByTick".to_string(),
tick_count: Arc::clone(&tick_count),
error_occurred: Default::default(),
};
info!("Requesting observed tick-by-tick data for {}...", contract.symbol);
let (req_id, observer_id) = data_mgr.request_observe_tick_by_tick(
&contract,
TickByTickRequestType::Last,
0, false, Some(MarketDataType::Delayed),
observer,
).context("Failed to request observed tick-by-tick data")?;
info!("Tick-by-tick data requested with ReqID: {}, ObserverID: {:?}. Waiting for data...",
req_id, observer_id);
let wait_duration = if is_live { Duration::from_secs(15) } else { Duration::from_secs(1) };
info!("Waiting for {:?} to capture streaming ticks...", wait_duration);
std::thread::sleep(wait_duration);
info!("Cancelling tick-by-tick request (ReqID: {})...", req_id);
data_mgr.cancel_tick_by_tick_data(req_id).context("Failed to cancel tick-by-tick data")?;
info!("Removing tick-by-tick observer (ObserverID: {:?})...", observer_id);
data_mgr.remove_tick_by_tick_observer(observer_id);
let final_count = {
let count = tick_count.lock().unwrap();
*count
};
info!("Tick-by-tick test completed. Received {} ticks.", final_count);
if final_count == 0 && is_live {
warn!("Received 0 ticks. This might be okay depending on market activity.");
}
Ok(())
}
pub(super) fn observe_market_depth_impl(client: &IBKRClient, is_live: bool) -> Result<()> {
info!("--- Testing Observe Market Depth ---");
let data_mgr = client.data_market();
let contract = Contract {
symbol: "EUR".to_string(),
exchange: "IDEALPRO".to_string(),
sec_type: SecType::Forex,
currency: "GBP".to_string(),
..Default::default()
};
#[derive(Debug)]
struct TestMarketDepthObserver {
name: String,
update_count: Arc<Mutex<usize>>,
error_occurred: std::sync::atomic::AtomicBool,
}
impl MarketDepthObserver for TestMarketDepthObserver {
fn on_update_mkt_depth(&self, req_id: i32, position: i32, operation: i32, side: i32,
price: f64, size: f64) {
info!("[{}] Depth L1: ReqID={}, Pos={}, Op={}, Side={}, Price={}, Size={}",
self.name, req_id, position, operation, side, price, size);
{
let mut count = self.update_count.lock().unwrap();
*count += 1;
}
}
fn on_update_mkt_depth_l2(&self, req_id: i32, position: i32, market_maker: &str,
operation: i32, side: i32, price: f64, size: f64,
is_smart_depth: bool) {
info!("[{}] Depth L2: ReqID={}, Pos={}, MM={}, Op={}, Side={}, Price={}, Size={}, Smart={}",
self.name, req_id, position, market_maker, operation, side, price, size, is_smart_depth);
{
let mut count = self.update_count.lock().unwrap();
*count += 1;
}
}
fn on_error(&self, req_id: i32, error_code: i32, error_message: &str) {
error!("[{}] Error: ReqID={}, Code={}, Msg='{}'",
self.name, req_id, error_code, error_message);
self.error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
let update_count = Arc::new(Mutex::new(0));
let observer = TestMarketDepthObserver {
name: "IBM-MarketDepth".to_string(),
update_count: Arc::clone(&update_count),
error_occurred: Default::default(),
};
info!("Requesting observed market depth for {}...", contract.symbol);
let (req_id, observer_id) = data_mgr.request_observe_market_depth(
&contract,
5, false, &[], observer,
).context("Failed to request observed market depth")?;
info!("Market depth requested with ReqID: {}, ObserverID: {:?}. Waiting for data...",
req_id, observer_id);
let wait_duration = if is_live { Duration::from_secs(15) } else { Duration::from_secs(1) };
info!("Waiting for {:?} to capture depth updates...", wait_duration);
std::thread::sleep(wait_duration);
info!("Cancelling market depth request (ReqID: {})...", req_id);
data_mgr.cancel_market_depth(req_id).context("Failed to cancel market depth")?;
info!("Removing market depth observer (ObserverID: {:?})...", observer_id);
data_mgr.remove_market_depth_observer(observer_id);
let final_count = {
let count = update_count.lock().unwrap();
*count
};
info!("Market depth test completed. Received {} updates.", final_count);
if final_count == 0 && is_live {
warn!("Received 0 depth updates. This might be okay depending on market activity.");
}
Ok(())
}
pub(super) fn observe_historical_data_impl(client: &IBKRClient, _is_live: bool) -> Result<()> {
info!("--- Testing Observe Historical Data ---");
let data_mgr = client.data_market();
let contract = Contract::stock("MSFT");
#[derive(Debug)]
struct TestHistoricalDataObserver {
name: String,
bar_count: Arc<Mutex<usize>>,
completed: Arc<Mutex<bool>>,
error_occurred: std::sync::atomic::AtomicBool,
}
impl HistoricalDataObserver for TestHistoricalDataObserver {
fn on_historical_data(&self, req_id: i32, bar: &yatws::contract::Bar) {
info!("[{}] Historical Bar: ReqID={}, Time={}, OHLC={},{},{},{}, Vol={}",
self.name, req_id, bar.time.format("%Y-%m-%d %H:%M:%S"),
bar.open, bar.high, bar.low, bar.close, bar.volume);
{
let mut count = self.bar_count.lock().unwrap();
*count += 1;
}
}
fn on_historical_data_update(&self, req_id: i32, bar: &yatws::contract::Bar) {
info!("[{}] Historical Bar Update: ReqID={}, Time={}, OHLC={},{},{},{}, Vol={}",
self.name, req_id, bar.time.format("%Y-%m-%d %H:%M:%S"),
bar.open, bar.high, bar.low, bar.close, bar.volume);
}
fn on_historical_data_end(&self, req_id: i32, start_date: Option<DateTime<Utc>>, end_date: Option<DateTime<Utc>>) {
info!("[{}] Historical Data End: ReqID={}, Start={:?}, End={:?}",
self.name, req_id, start_date, end_date);
{
let mut completed = self.completed.lock().unwrap();
*completed = true;
}
}
fn on_error(&self, req_id: i32, error_code: i32, error_message: &str) {
error!("[{}] Error: ReqID={}, Code={}, Msg='{}'",
self.name, req_id, error_code, error_message);
self.error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
let bar_count = Arc::new(Mutex::new(0));
let completed = Arc::new(Mutex::new(false));
let observer = TestHistoricalDataObserver {
name: "MSFT-HistoricalData".to_string(),
bar_count: Arc::clone(&bar_count),
completed: Arc::clone(&completed),
error_occurred: Default::default(),
};
info!("Requesting observed historical data for {}...", contract.symbol);
let (req_id, observer_id) = data_mgr.request_observe_historical_data(
&contract,
None, yatws::data::DurationUnit::Day(3), BarSize::OneHour,
WhatToShow::Trades,
true, 1, false, Some(MarketDataType::Delayed),
&[], observer,
).context("Failed to request observed historical data")?;
info!("Historical data requested with ReqID: {}, ObserverID: {:?}. Waiting for completion...",
req_id, observer_id);
let timeout = Duration::from_secs(30);
let start_time = std::time::Instant::now();
let mut is_completed = false;
while !is_completed && start_time.elapsed() < timeout {
std::thread::sleep(Duration::from_millis(100));
{
let completed_guard = completed.lock().unwrap();
is_completed = *completed_guard;
}
}
info!("Removing historical data observer (ObserverID: {:?})...", observer_id);
data_mgr.remove_historical_data_observer(observer_id);
let final_count = {
let count = bar_count.lock().unwrap();
*count
};
info!("Historical data test completed. Received {} bars, Completed={}.",
final_count, is_completed);
if !is_completed {
warn!("Historical data request did not complete within timeout. This might indicate issues.");
}
if final_count == 0 && is_completed {
warn!("Received 0 bars but request completed. This might be okay for the specified date range.");
}
Ok(())
}
pub(super) fn observe_historical_ticks_impl(client: &IBKRClient, _is_live: bool) -> Result<()> {
info!("--- Testing Observe Historical Ticks ---");
let data_mgr = client.data_market();
let contract = Contract::stock("AAPL");
#[derive(Debug)]
struct TestHistoricalTicksObserver {
name: String,
midpoint_count: Arc<Mutex<usize>>,
bidask_count: Arc<Mutex<usize>>,
last_count: Arc<Mutex<usize>>,
completed: Arc<Mutex<bool>>,
error_occurred: std::sync::atomic::AtomicBool,
}
impl HistoricalTicksObserver for TestHistoricalTicksObserver {
fn on_historical_ticks_midpoint(&self, req_id: i32, ticks: &[(i64, f64, f64)], done: bool) {
info!("[{}] Historical Ticks Midpoint: ReqID={}, Count={}, Done={}",
self.name, req_id, ticks.len(), done);
{
let mut count = self.midpoint_count.lock().unwrap();
*count += ticks.len();
}
if done {
let mut completed = self.completed.lock().unwrap();
*completed = true;
}
}
fn on_historical_ticks_bid_ask(&self, req_id: i32,
ticks: &[(i64, yatws::data::TickAttribBidAsk, f64, f64, f64, f64)],
done: bool) {
info!("[{}] Historical Ticks BidAsk: ReqID={}, Count={}, Done={}",
self.name, req_id, ticks.len(), done);
{
let mut count = self.bidask_count.lock().unwrap();
*count += ticks.len();
}
if done {
let mut completed = self.completed.lock().unwrap();
*completed = true;
}
}
fn on_historical_ticks_last(&self, req_id: i32,
ticks: &[(i64, yatws::data::TickAttribLast, f64, f64, String, String)],
done: bool) {
info!("[{}] Historical Ticks Last: ReqID={}, Count={}, Done={}",
self.name, req_id, ticks.len(), done);
{
let mut count = self.last_count.lock().unwrap();
*count += ticks.len();
}
if done {
let mut completed = self.completed.lock().unwrap();
*completed = true;
}
}
fn on_error(&self, req_id: i32, error_code: i32, error_message: &str) {
error!("[{}] Error: ReqID={}, Code={}, Msg='{}'",
self.name, req_id, error_code, error_message);
self.error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
let midpoint_count = Arc::new(Mutex::new(0));
let bidask_count = Arc::new(Mutex::new(0));
let last_count = Arc::new(Mutex::new(0));
let completed = Arc::new(Mutex::new(false));
let observer = TestHistoricalTicksObserver {
name: "AAPL-HistoricalTicks".to_string(),
midpoint_count: Arc::clone(&midpoint_count),
bidask_count: Arc::clone(&bidask_count),
last_count: Arc::clone(&last_count),
completed: Arc::clone(&completed),
error_occurred: Default::default(),
};
info!("Requesting observed historical ticks for {}...", contract.symbol);
let (req_id, observer_id) = data_mgr.request_observe_historical_ticks(
&contract,
None, Some(Utc::now()), 10, WhatToShow::Trades, false, false, &[], observer,
).context("Failed to request observed historical ticks")?;
info!("Historical ticks requested with ReqID: {}, ObserverID: {:?}. Waiting for completion...",
req_id, observer_id);
let timeout = Duration::from_secs(30);
let start_time = std::time::Instant::now();
let mut is_completed = false;
while !is_completed && start_time.elapsed() < timeout {
std::thread::sleep(Duration::from_millis(100));
{
let completed_guard = completed.lock().unwrap();
is_completed = *completed_guard;
}
}
info!("Removing historical ticks observer (ObserverID: {:?})...", observer_id);
data_mgr.remove_historical_ticks_observer(observer_id);
let last_ticks = {
let count = last_count.lock().unwrap();
*count
};
info!("Historical ticks test completed:");
info!(" Last Ticks: {}", last_ticks);
info!(" Completed: {}", is_completed);
if !is_completed {
warn!("Historical ticks request did not complete within timeout. This might indicate issues.");
}
if last_ticks == 0 && is_completed {
warn!("Received 0 ticks but request completed. This might be okay for the specified date range.");
}
Ok(())
}