import argparse
import datetime
import logging
import sys
import time
import threading
from ibapi import wrapper
from ibapi.client import EClient
from ibapi.contract import Contract, ComboLeg
from ibapi.utils import iswrapper
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 4002
DEFAULT_CLIENT_ID = 103
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)
class TestApp(wrapper.EWrapper, EClient):
def __init__(self):
wrapper.EWrapper.__init__(self)
EClient.__init__(self, wrapper=self)
self.nKeybInt = 0
self.started = False
self.nextValidOrderId = -1
self.permId2ord = {}
self.reqId2nErr = {}
self.globalCancelOnly = False
self._my_errors = {}
self.mkt_data_req_id = -1
self.quote_data = {}
self.quote_received = threading.Event()
self.quote_complete = False
@iswrapper
def connectAck(self):
log.info("Connection acknowledged.")
@iswrapper
def nextValidId(self, orderId: int):
super().nextValidId(orderId)
self.nextValidOrderId = orderId
log.info("nextValidId: %d", orderId)
self.start()
def start(self):
if self.started:
return
self.started = True
log.info("Setting market data type to delayed")
self.reqMarketDataType(3)
time.sleep(1)
log.info("Executing requests")
self.request_delayed_quote_test()
log.info("Requests finished")
def keyboardInterrupt(self):
self.nKeybInt += 1
if self.nKeybInt == 1:
log.info("Keyboard interrupt detected. Disconnecting...")
self.done = True
self.disconnect()
else:
log.info("Forcing exit...")
sys.exit(0)
@iswrapper
def error(self, reqId: int, errorCode: int, errorString: str, advancedOrderReject=""):
super().error(reqId, errorCode, errorString, advancedOrderReject)
if advancedOrderReject:
log.error("Error. Id: %d, Code: %d, Msg: %s, AdvancedOrderReject: %s", reqId, errorCode, errorString, advancedOrderReject)
else:
log.error("Error. Id: %d, Code: %d, Msg: %s", reqId, errorCode, errorString)
if reqId == self.mkt_data_req_id:
log.error(f"Market data request {reqId} failed with error {errorCode}: {errorString}")
self.quote_received.set()
if reqId > 0:
self.reqId2nErr[reqId] = self.reqId2nErr.get(reqId, 0) + 1
self._my_errors[reqId] = (errorCode, errorString)
@iswrapper
def marketDataType(self, reqId: int, marketDataType: int):
super().marketDataType(reqId, marketDataType)
log.info("Market data type set. ReqId: %d, Type: %d", reqId, marketDataType)
@iswrapper
def tickPrice(self, reqId: int, tickType: int, price: float, attrib):
super().tickPrice(reqId, tickType, price, attrib)
log.info("CALLBACK tickPrice: ReqId: %d, TickType: %d, Price: %f", reqId, tickType, price)
if reqId == self.mkt_data_req_id:
tick_name = self.get_tick_type_name(tickType)
self.quote_data[f"price_{tick_name}"] = price
self.quote_data[f"price_{tickType}"] = price
if not self.quote_complete:
self.quote_received.set()
@iswrapper
def tickSize(self, reqId: int, tickType: int, size: int):
super().tickSize(reqId, tickType, size)
log.info("CALLBACK tickSize: ReqId: %d, TickType: %d, Size: %d", reqId, tickType, size)
if reqId == self.mkt_data_req_id:
tick_name = self.get_tick_type_name(tickType)
self.quote_data[f"size_{tick_name}"] = size
self.quote_data[f"size_{tickType}"] = size
@iswrapper
def tickGeneric(self, reqId: int, tickType: int, value: float):
super().tickGeneric(reqId, tickType, value)
log.info("CALLBACK tickGeneric: ReqId: %d, TickType: %d, Value: %f", reqId, tickType, value)
if reqId == self.mkt_data_req_id:
tick_name = self.get_tick_type_name(tickType)
self.quote_data[f"generic_{tick_name}"] = value
self.quote_data[f"generic_{tickType}"] = value
@iswrapper
def tickString(self, reqId: int, tickType: int, value: str):
super().tickString(reqId, tickType, value)
log.info("CALLBACK tickString: ReqId: %d, TickType: %d, Value: %s", reqId, tickType, value)
if reqId == self.mkt_data_req_id:
tick_name = self.get_tick_type_name(tickType)
self.quote_data[f"string_{tick_name}"] = value
self.quote_data[f"string_{tickType}"] = value
def get_tick_type_name(self, tick_type: int) -> str:
tick_names = {
0: "BID_SIZE",
1: "BID",
2: "ASK",
3: "ASK_SIZE",
4: "LAST",
5: "LAST_SIZE",
6: "HIGH",
7: "LOW",
8: "VOLUME",
9: "CLOSE",
10: "BID_OPTION_COMPUTATION",
11: "ASK_OPTION_COMPUTATION",
12: "LAST_OPTION_COMPUTATION",
13: "MODEL_OPTION",
14: "OPEN",
15: "LOW_13_WEEK",
16: "HIGH_13_WEEK",
17: "LOW_26_WEEK",
18: "HIGH_26_WEEK",
19: "LOW_52_WEEK",
20: "HIGH_52_WEEK",
21: "AVG_VOLUME",
}
return tick_names.get(tick_type, f"UNKNOWN_{tick_type}")
def request_delayed_quote_test(self):
self.mkt_data_req_id = self.nextValidOrderId
self.nextValidOrderId += 1
log.info(f"Requesting delayed quote with reqId: {self.mkt_data_req_id}")
contract = Contract()
contract.symbol = "ES"
contract.secType = "BAG" contract.exchange = "CME"
contract.currency = "USD"
leg1 = ComboLeg()
leg1.conId = 654505646
leg1.ratio = 1
leg1.action = "BUY"
leg1.exchange = "CME"
leg1.openClose = 0 leg1.shortSaleSlot = 0
leg1.designatedLocation = ""
leg1.exemptCode = -1
leg2 = ComboLeg()
leg2.conId = 654505738
leg2.ratio = 1
leg2.action = "SELL"
leg2.exchange = "CME"
leg2.openClose = 0 leg2.shortSaleSlot = 0
leg2.designatedLocation = ""
leg2.exemptCode = -1
leg3 = ComboLeg()
leg3.conId = 654505869
leg3.ratio = 1
leg3.action = "BUY"
leg3.exchange = "CME"
leg3.openClose = 0 leg3.shortSaleSlot = 0
leg3.designatedLocation = ""
leg3.exemptCode = -1
leg4 = ComboLeg()
leg4.conId = 654505848
leg4.ratio = 1
leg4.action = "SELL"
leg4.exchange = "CME"
leg4.openClose = 0 leg4.shortSaleSlot = 0
leg4.designatedLocation = ""
leg4.exemptCode = -1
contract.comboLegs = [leg1, leg2, leg3, leg4]
log.info(f"Requesting Market Data for {contract.symbol} combo contract:")
log.info(f" SecType: {contract.secType}, Exchange: {contract.exchange}, Currency: {contract.currency}")
log.info(f" Number of combo legs: {len(contract.comboLegs)}")
for i, leg in enumerate(contract.comboLegs):
log.info(f" Leg {i+1}: ConId={leg.conId}, Ratio={leg.ratio}, Action={leg.action}, Exchange={leg.exchange}")
genericTickList = "" snapshot = False regulatorySnapshot = False
mktDataOptions = []
self.reqMktData(self.mkt_data_req_id, contract, genericTickList,
snapshot, regulatorySnapshot, mktDataOptions)
log.info(f"Market data request {self.mkt_data_req_id} sent. Background thread will process response.")
def main():
parser = argparse.ArgumentParser(description="IB API Delayed Quote Test for ES Combo")
parser.add_argument("--host", default=DEFAULT_HOST, help="Host address")
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="Port number")
parser.add_argument("--clientId", type=int, default=DEFAULT_CLIENT_ID, help="Client ID")
args = parser.parse_args()
log.info("Starting Delayed Quote Test for ES Combo Contract")
log.info(f"Connecting to {args.host}:{args.port} with clientId {args.clientId}")
try:
app = TestApp()
log.info(f"Logger level set to: {logging.getLevelName(log.level)}")
app.connect(args.host, args.port, args.clientId)
log.info("Connection initiated. Server version: %s", app.serverVersion())
log.info("Starting EClient.run() message loop in background thread...")
thread = threading.Thread(target=app.run, daemon=True)
thread.start()
log.info("EClient.run() thread started.")
wait_timeout_secs = 30 log.info(f"Main thread waiting up to {wait_timeout_secs}s for quote data (reqId: {app.mkt_data_req_id})...")
wait_successful = app.quote_received.wait(timeout=wait_timeout_secs)
if wait_successful:
log.info(f"Main thread: Quote data received for request {app.mkt_data_req_id}.")
log.info(f"Quote data summary:")
for key, value in app.quote_data.items():
log.info(f" {key}: {value}")
log.info("Collecting data for 10 more seconds...")
time.sleep(10)
app.quote_complete = True
log.info("Final quote data:")
for key, value in app.quote_data.items():
log.info(f" {key}: {value}")
else:
log.warning(f"Main thread: Quote request {app.mkt_data_req_id} timed out!")
log.warning(f" Received data: {app.quote_data}")
log.info(f"Cancelling market data subscription {app.mkt_data_req_id}...")
if app.isConnected():
app.cancelMktData(app.mkt_data_req_id)
time.sleep(1)
log.info("Main thread: Disconnecting...")
if app.isConnected():
app.disconnect()
else:
log.info("Main thread: Already disconnected.")
join_timeout_secs = 10
log.info(f"Main thread waiting for EClient thread to join (timeout {join_timeout_secs}s)...")
thread.join(timeout=join_timeout_secs)
log.info("Main thread finished waiting for EClient thread.")
if thread.is_alive():
log.warning("EClient thread did not exit cleanly after disconnect and join timeout.")
if app.isConnected():
log.warning("Attempting disconnect again...")
app.disconnect()
else:
log.info("EClient thread exited cleanly.")
log.info("Delayed Quote Test finished.")
except Exception as e:
log.exception("Unhandled exception in main:")
finally:
log.info("Exiting.")
if __name__ == "__main__":
main()