mod cancellations;
mod lifecycle;
mod orders;
mod reports;
mod responses;
pub(crate) mod identity;
pub mod order_builder;
pub(crate) mod order_fill_tracker;
pub mod parse;
pub(crate) mod pending;
pub(crate) mod reconciliation;
pub(crate) mod submitter;
pub(crate) mod types;
use std::sync::{Arc, Mutex, atomic::AtomicBool};
use anyhow::Context;
use async_trait::async_trait;
use nautilus_common::{
clients::ExecutionClient,
messages::execution::{
BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
},
msgbus::TypedHandler,
};
use nautilus_core::{
UnixNanos,
collections::AtomicMap,
time::{AtomicTime, get_atomic_clock_realtime},
};
use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
use nautilus_model::{
accounts::AccountAny,
enums::{AccountType, LiquiditySide, OmsType},
events::{OrderEventAny, PositionEvent},
identifiers::{
AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
},
instruments::InstrumentAny,
reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
types::{AccountBalance, MarginBalance, Money, Price, Quantity},
};
use nautilus_network::retry::RetryConfig;
use tokio::task::JoinHandle;
use ustr::Ustr;
pub(crate) use self::reports::get_pusd_currency;
use self::{
identity::OrderIdentityRegistry,
order_builder::PolymarketOrderBuilder,
order_fill_tracker::OrderFillTrackerMap,
pending::{PendingCancelTracker, PendingSubmitTracker},
submitter::OrderSubmitter,
};
use crate::{
common::{consts::POLYMARKET_VENUE, credential::Secrets, enums::SignatureType},
config::PolymarketExecClientConfig,
http::{clob::PolymarketClobHttpClient, data_api::PolymarketDataApiHttpClient},
signing::eip712::OrderSigner,
websocket::client::PolymarketWebSocketClient,
};
#[derive(Debug)]
pub struct PolymarketExecutionClient {
core: ExecutionClientCore,
clock: &'static AtomicTime,
config: PolymarketExecClientConfig,
emitter: ExecutionEventEmitter,
http_client: PolymarketClobHttpClient,
data_api_client: PolymarketDataApiHttpClient,
submitter: OrderSubmitter,
ws_client: PolymarketWebSocketClient,
secrets: Secrets,
pending_tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
stopping: Arc<AtomicBool>,
ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
order_event_handler: Option<TypedHandler<OrderEventAny>>,
position_event_handler: Option<TypedHandler<PositionEvent>>,
shared_token_instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
neg_risk_index: Arc<AtomicMap<InstrumentId, bool>>,
pending_submits: PendingSubmitTracker,
pending_cancels: PendingCancelTracker,
order_identities: Arc<OrderIdentityRegistry>,
fill_tracker: Arc<OrderFillTrackerMap>,
}
impl PolymarketExecutionClient {
pub fn new(
core: ExecutionClientCore,
config: PolymarketExecClientConfig,
) -> anyhow::Result<Self> {
let secrets = Secrets::resolve(
config.private_key.as_deref(),
config.api_key.clone(),
config.api_secret.clone(),
config.passphrase.clone(),
config.funder.clone(),
)
.context("failed to resolve Polymarket credentials")?;
let signer_address = secrets.address.clone();
let maker_address = secrets
.funder
.clone()
.unwrap_or_else(|| signer_address.clone());
if config.signature_type == SignatureType::Poly1271
&& maker_address.eq_ignore_ascii_case(&signer_address)
{
anyhow::bail!(
"POLY_1271 signature type requires a deposit wallet funder distinct from the signing address"
);
}
let http_client = PolymarketClobHttpClient::new(
secrets.credential.clone(),
signer_address.clone(),
config.base_url_http.clone(),
config.http_timeout_secs,
)
.map_err(|e| anyhow::anyhow!("{e}"))
.context("failed to create CLOB HTTP client")?;
let data_api_client =
PolymarketDataApiHttpClient::new(Some(config.data_api_url()), config.http_timeout_secs)
.map_err(|e| anyhow::anyhow!("{e}"))
.context("failed to create Data API HTTP client")?;
let order_signer =
OrderSigner::new(&secrets.private_key).context("failed to create order signer")?;
let order_builder = Arc::new(PolymarketOrderBuilder::new(
order_signer,
signer_address,
maker_address,
config.signature_type,
));
let retry_config = RetryConfig {
max_retries: config.max_retries,
initial_delay_ms: config.retry_delay_initial_ms,
max_delay_ms: config.retry_delay_max_ms,
backoff_factor: 2.0,
jitter_ms: 1_000,
operation_timeout_ms: Some(config.http_timeout_secs * 1_000),
immediate_first: false,
max_elapsed_ms: Some(180_000),
};
let submitter = OrderSubmitter::new(http_client.clone(), order_builder, retry_config);
let ws_client = PolymarketWebSocketClient::new_user(
config.base_url_ws.clone(),
secrets.credential.clone(),
config.transport_backend,
);
let clock = get_atomic_clock_realtime();
let pusd = get_pusd_currency();
let emitter = ExecutionEventEmitter::new(
clock,
core.trader_id,
core.account_id,
AccountType::Cash,
Some(pusd),
);
Ok(Self {
core,
clock,
config,
emitter,
http_client,
data_api_client,
submitter,
ws_client,
secrets,
pending_tasks: Arc::new(Mutex::new(Vec::new())),
stopping: Arc::new(AtomicBool::new(false)),
ws_stream_handle: Mutex::new(None),
order_event_handler: None,
position_event_handler: None,
shared_token_instruments: Arc::new(AtomicMap::new()),
neg_risk_index: Arc::new(AtomicMap::new()),
pending_submits: PendingSubmitTracker::default(),
pending_cancels: PendingCancelTracker::default(),
order_identities: Arc::new(OrderIdentityRegistry::default()),
fill_tracker: Arc::new(OrderFillTrackerMap::new()),
})
}
}
#[async_trait(?Send)]
impl ExecutionClient for PolymarketExecutionClient {
fn is_connected(&self) -> bool {
self.core.is_connected()
}
fn client_id(&self) -> ClientId {
self.core.client_id
}
fn account_id(&self) -> AccountId {
self.core.account_id
}
fn venue(&self) -> Venue {
*POLYMARKET_VENUE
}
fn oms_type(&self) -> OmsType {
OmsType::Netting
}
fn get_account(&self) -> Option<AccountAny> {
self.core.cache().account_owned(&self.core.account_id)
}
fn generate_account_state(
&self,
balances: Vec<AccountBalance>,
margins: Vec<MarginBalance>,
reported: bool,
ts_event: UnixNanos,
) -> anyhow::Result<()> {
self.emitter
.emit_account_state(balances, margins, reported, ts_event);
Ok(())
}
fn start(&mut self) -> anyhow::Result<()> {
self.start_client();
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
self.stop_client();
Ok(())
}
fn reset(&mut self) -> anyhow::Result<()> {
self.reset_client();
Ok(())
}
fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
self.submit_order_command(&cmd)
}
fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
self.submit_order_list_command(&cmd);
Ok(())
}
fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
self.modify_order_command(&cmd);
Ok(())
}
fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
self.cancel_order_command(&cmd);
Ok(())
}
fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
self.cancel_all_orders_command(&cmd);
Ok(())
}
fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
self.batch_cancel_orders_command(&cmd);
Ok(())
}
fn query_account(&self, cmd: QueryAccount) -> anyhow::Result<()> {
self.query_account_command(cmd);
Ok(())
}
fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
self.query_order_command(&cmd);
Ok(())
}
fn register_external_order(
&self,
_client_order_id: ClientOrderId,
_venue_order_id: VenueOrderId,
_instrument_id: InstrumentId,
_strategy_id: StrategyId,
_ts_init: UnixNanos,
) {
}
fn on_instrument(&mut self, instrument: InstrumentAny) {
self.on_instrument_update(&instrument);
}
fn calculate_commission(
&self,
instrument: &InstrumentAny,
last_qty: Quantity,
last_px: Price,
liquidity_side: LiquiditySide,
) -> Option<Money> {
Some(self.calculate_commission_impl(instrument, last_qty, last_px, liquidity_side))
}
async fn connect(&mut self) -> anyhow::Result<()> {
self.connect_client().await
}
async fn disconnect(&mut self) -> anyhow::Result<()> {
self.disconnect_client().await
}
async fn generate_order_status_report(
&self,
cmd: &GenerateOrderStatusReport,
) -> anyhow::Result<Option<OrderStatusReport>> {
self.generate_order_status_report_impl(cmd).await
}
async fn generate_order_status_reports(
&self,
cmd: &GenerateOrderStatusReports,
) -> anyhow::Result<Vec<OrderStatusReport>> {
self.generate_order_status_reports_impl(cmd).await
}
async fn generate_fill_reports(
&self,
cmd: GenerateFillReports,
) -> anyhow::Result<Vec<FillReport>> {
self.generate_fill_reports_impl(cmd).await
}
async fn generate_position_status_reports(
&self,
cmd: &GeneratePositionStatusReports,
) -> anyhow::Result<Vec<PositionStatusReport>> {
self.generate_position_status_reports_impl(cmd).await
}
async fn generate_mass_status(
&self,
lookback_mins: Option<u64>,
) -> anyhow::Result<Option<ExecutionMassStatus>> {
self.generate_mass_status_impl(lookback_mins).await
}
}