from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
from ibapi.ticktype import TickType, TickTypeEnum
import threading
import time
class IBapi(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.next_req_id = 1
def error(self, reqId, errorCode, errorString, advancedOrderReject=""):
if reqId > 0:
print(f"Error. Id: {reqId}, Code: {errorCode}, Msg: {errorString}, Advanced Order Reject: {advancedOrderReject}")
else:
print(f"System Error. Code: {errorCode}, Msg: {errorString}")
def nextValidId(self, orderId: int):
super().nextValidId(orderId)
self.next_req_id = orderId print(f"Connection acknowledged. Next valid ID: {self.next_req_id}")
def tickPrice(self, reqId, tickType, price, attrib):
tick_name = TickTypeEnum.idx2name.get(tickType, f"UnknownTick({tickType})")
print(f"Tick Price. ReqId: {reqId}, Type: {tick_name}({tickType}), Price: {price}, Attr: {attrib}")
def tickSize(self, reqId, tickType, size):
tick_name = TickTypeEnum.idx2name.get(tickType, f"UnknownTick({tickType})")
print(f"Tick Size. ReqId: {reqId}, Type: {tick_name}({tickType}), Size: {size}")
def run_loop(app_instance):
print("Starting IB API message loop...")
app_instance.run()
print("IB API message loop finished.")
if __name__ == "__main__":
app = IBapi()
host = '127.0.0.1'
port = 3002 client_id = 102
print(f"Connecting to {host}:{port} with Client ID: {client_id}...")
app.connect(host, port, client_id)
api_thread = threading.Thread(target=run_loop, args=(app,), daemon=True)
api_thread.start()
print("Waiting for connection confirmation...")
time.sleep(2)
if not app.isConnected():
print("Connection failed!")
exit()
contract = Contract()
contract.symbol = "SPY" contract.secType = "STK"
contract.exchange = "SMART"
contract.currency = "USD"
req_id = app.next_req_id print(f"\nRequesting streaming market data for {contract.symbol} with Req ID: {req_id}...")
generic_tick_list = ""
snapshot = False regulatory_snapshot = False mkt_data_options = []
app.reqMktData(req_id, contract, generic_tick_list, snapshot, regulatory_snapshot, mkt_data_options)
stream_duration = 15 print(f"\nStreaming data for {stream_duration} seconds...")
time.sleep(stream_duration)
print(f"\nCancelling market data request (Req ID: {req_id})...")
app.cancelMktData(req_id)
time.sleep(1)
print("Disconnecting...")
app.disconnect()
api_thread.join(timeout=2)
print("Program finished.")