from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import threading
import time
class IBapi(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.data = {}
def tickPrice(self, reqId, tickType, price, attrib):
self.data[reqId] = self.data.get(reqId, {})
if tickType == 1:
self.data[reqId]['bid'] = price
elif tickType == 2:
self.data[reqId]['ask'] = price
elif tickType == 4:
self.data[reqId]['last'] = price
print(f"Stock Data Update for {reqId}: {self.data[reqId]}")
def run_loop():
app.run()
app = IBapi()
app.connect('127.0.0.1', 3002, 0)
api_thread = threading.Thread(target=run_loop, daemon=True)
api_thread.start()
time.sleep(1)
def create_stock_contract(symbol):
contract = Contract()
contract.symbol = symbol
contract.secType = 'STK'
contract.exchange = 'SMART'
contract.currency = 'USD'
return contract
def request_market_data(symbol, req_id):
contract = create_stock_contract(symbol)
app.reqMktData(req_id, contract, '', False, False, [])
symbol = 'AAPL' request_market_data(symbol, 1)
try:
while True:
time.sleep(0.5)
except KeyboardInterrupt:
print("Canceling market data request...")
app.cancelMktData(1)
app.disconnect()
print("Disconnected")