from websocket import WebSocketApp
import json
import time
import threading
MARKET_CHANNEL = "market"
class WebSocketMarketChannel:
def __init__(self, url, asset_ids):
self.url = url.rstrip("/")
self.asset_ids = asset_ids
self.ws_url = f"{self.url}/ws/{MARKET_CHANNEL}"
self.ws = WebSocketApp(
self.ws_url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
)
def on_open(self, ws):
ws.send(json.dumps({"assets_ids": self.asset_ids, "type": MARKET_CHANNEL}))
ping_thread = threading.Thread(target=self._ping_forever, args=(ws,), daemon=True)
ping_thread.start()
def on_message(self, ws, message):
print(message)
def on_error(self, ws, error):
print("Error:", error)
ws.close()
def on_close(self, ws, close_status_code, close_msg):
print("Connection closed:", close_status_code, close_msg)
def _ping_forever(self, ws):
while True:
try:
ws.send("PING")
except Exception as exc:
print("Ping failed:", exc)
break
time.sleep(10)
def run(self):
self.ws.run_forever()
if __name__ == "__main__":
url = "wss://ws-subscriptions-clob.polymarket.com"
asset_ids = [
"109681959945973300464568698402968596289258214226684818748321941747028805721376",
]
market_client = WebSocketMarketChannel(url, asset_ids)
market_client.run()