1use std::{
19 sync::atomic::Ordering,
20 time::{Duration, Instant},
21};
22
23use dashmap::DashMap;
24use nautilus_common::live::get_runtime;
25use nautilus_core::{
26 UUID4,
27 python::{call_python_threadsafe, to_pyvalue_err},
28 time::get_atomic_clock_realtime,
29};
30use nautilus_model::{
31 data::{BarType, Data, OrderBookDeltas_API},
32 enums::AccountType,
33 events::AccountState,
34 identifiers::{AccountId, InstrumentId},
35 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
36 types::{AccountBalance, Currency, Money},
37};
38use nautilus_network::mode::ConnectionMode;
39use pyo3::{IntoPyObjectExt, prelude::*, types::PyDict};
40
41use crate::{
42 common::{credential::DydxCredential, enums::DydxCandleResolution, parse::extract_raw_symbol},
43 execution::types::OrderContext,
44 http::{client::DydxHttpClient, parse::parse_account_state},
45 python::encoder::PyDydxClientOrderIdEncoder,
46 websocket::{
47 client::DydxWebSocketClient,
48 enums::NautilusWsMessage,
49 handler::HandlerCommand,
50 parse::{parse_ws_fill_report, parse_ws_order_report, parse_ws_position_report},
51 },
52};
53
54#[pymethods]
55impl DydxWebSocketClient {
56 #[staticmethod]
57 #[pyo3(name = "new_public")]
58 fn py_new_public(url: String, heartbeat: Option<u64>) -> Self {
59 Self::new_public(url, heartbeat)
60 }
61
62 #[staticmethod]
63 #[pyo3(name = "new_private")]
64 fn py_new_private(
65 url: String,
66 private_key: String,
67 authenticator_ids: Vec<u64>,
68 account_id: AccountId,
69 heartbeat: Option<u64>,
70 ) -> PyResult<Self> {
71 let credential = DydxCredential::from_private_key(&private_key, authenticator_ids)
72 .map_err(to_pyvalue_err)?;
73 Ok(Self::new_private(url, credential, account_id, heartbeat))
74 }
75
76 #[pyo3(name = "is_connected")]
77 fn py_is_connected(&self) -> bool {
78 self.is_connected()
79 }
80
81 #[pyo3(name = "set_bars_timestamp_on_close")]
82 fn py_set_bars_timestamp_on_close(&mut self, value: bool) {
83 self.set_bars_timestamp_on_close(value);
84 }
85
86 #[pyo3(name = "set_account_id")]
87 fn py_set_account_id(&mut self, account_id: AccountId) {
88 self.set_account_id(account_id);
89 }
90
91 #[pyo3(name = "share_instrument_cache")]
97 fn py_share_instrument_cache(&mut self, http_client: &DydxHttpClient) {
98 self.set_instrument_cache(http_client.instrument_cache().clone());
99 }
100
101 #[pyo3(name = "account_id")]
102 fn py_account_id(&self) -> Option<AccountId> {
103 self.account_id()
104 }
105
106 #[pyo3(name = "encoder")]
108 fn py_encoder(&self) -> PyDydxClientOrderIdEncoder {
109 PyDydxClientOrderIdEncoder::from_arc(self.encoder().clone())
110 }
111
112 #[getter]
113 fn py_url(&self) -> String {
114 self.url().to_string()
115 }
116
117 #[pyo3(name = "connect")]
118 fn py_connect<'py>(
119 &mut self,
120 py: Python<'py>,
121 loop_: Py<PyAny>,
122 instruments: Vec<Py<PyAny>>,
123 callback: Py<PyAny>,
124 ) -> PyResult<Bound<'py, PyAny>> {
125 let call_soon = loop_.getattr(py, "call_soon_threadsafe")?;
126
127 let mut instruments_any = Vec::new();
128 for inst in instruments {
129 let inst_any = pyobject_to_instrument_any(py, inst)?;
130 instruments_any.push(inst_any);
131 }
132
133 self.cache_instruments(instruments_any);
134
135 let mut client = self.clone();
136
137 pyo3_async_runtimes::tokio::future_into_py(py, async move {
138 client.connect().await.map_err(to_pyvalue_err)?;
139
140 if let Some(mut rx) = client.take_receiver() {
141 get_runtime().spawn(async move {
142 let _client = client; let clock = get_atomic_clock_realtime();
144 let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
145 let order_id_map: DashMap<String, (u32, u32)> = DashMap::new();
146
147 while let Some(msg) = rx.recv().await {
148 match msg {
149 NautilusWsMessage::Data(items) => {
150 Python::attach(|py| {
151 for data in items {
152 let py_obj = data_to_pycapsule(py, data);
153 call_python_threadsafe(py, &call_soon, &callback, py_obj);
154 }
155 });
156 }
157 NautilusWsMessage::Deltas(deltas) => {
158 Python::attach(|py| {
159 let data = Data::Deltas(OrderBookDeltas_API::new(*deltas));
160 let py_obj = data_to_pycapsule(py, data);
161 call_python_threadsafe(py, &call_soon, &callback, py_obj);
162 });
163 }
164 NautilusWsMessage::BlockHeight { height, time } => {
165 Python::attach(|py| {
166 let dict = PyDict::new(py);
167 let _ = dict.set_item("type", "block_height");
168 let _ = dict.set_item("height", height);
169 let _ = dict.set_item("time", time.to_rfc3339());
170 if let Ok(py_obj) = dict.into_py_any(py) {
171 call_python_threadsafe(py, &call_soon, &callback, py_obj);
172 }
173 });
174 }
175 NautilusWsMessage::SubaccountSubscribed(data) => {
176 let Some(account_id) = _client.account_id() else {
177 log::warn!("Cannot parse subaccount subscription: account_id not set");
178 continue;
179 };
180
181 let instrument_cache = _client.instrument_cache();
182 let ts_init = clock.get_time_ns();
183
184 let inst_map = instrument_cache.to_instrument_id_map();
185 let oracle_map = instrument_cache.to_oracle_prices_map();
186
187 if let Some(ref subaccount) = data.contents.subaccount {
188 match parse_account_state(
189 subaccount,
190 account_id,
191 &inst_map,
192 &oracle_map,
193 ts_init,
194 ts_init,
195 ) {
196 Ok(account_state) => {
197 Python::attach(|py| {
198 match account_state.into_py_any(py) {
199 Ok(py_obj) => {
200 call_python_threadsafe(py, &call_soon, &callback, py_obj);
201 }
202 Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
203 }
204 });
205 }
206 Err(e) => log::error!("Failed to parse account state: {e}"),
207 }
208
209 if let Some(ref positions) = subaccount.open_perpetual_positions {
210 for (market, ws_position) in positions {
211 match parse_ws_position_report(
212 ws_position,
213 instrument_cache,
214 account_id,
215 ts_init,
216 ) {
217 Ok(report) => {
218 Python::attach(|py| {
219 match pyo3::Py::new(py, report) {
220 Ok(py_obj) => {
221 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
222 }
223 Err(e) => log::error!("Failed to convert PositionStatusReport to Python: {e}"),
224 }
225 });
226 }
227 Err(e) => log::error!("Failed to parse position for {market}: {e}"),
228 }
229 }
230 }
231 } else {
232 log::warn!("Subaccount subscription without initial state (new/empty subaccount)");
233
234 let currency = Currency::get_or_create_crypto_with_context("USDC", None);
236 let zero = Money::zero(currency);
237 let balance = AccountBalance::new_checked(zero, zero, zero)
238 .expect("zero balance should always be valid");
239 let account_state = AccountState::new(
240 account_id,
241 AccountType::Margin,
242 vec![balance],
243 vec![],
244 true,
245 UUID4::new(),
246 ts_init,
247 ts_init,
248 None,
249 );
250 Python::attach(|py| {
251 match account_state.into_py_any(py) {
252 Ok(py_obj) => {
253 call_python_threadsafe(py, &call_soon, &callback, py_obj);
254 }
255 Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
256 }
257 });
258 }
259 }
260 NautilusWsMessage::SubaccountsChannelData(data) => {
261 let Some(account_id) = _client.account_id() else {
262 log::warn!("Cannot parse SubaccountsChannelData: account_id not set");
263 continue;
264 };
265
266 let instrument_cache = _client.instrument_cache();
267 let encoder = _client.encoder();
268 let ts_init = clock.get_time_ns();
269
270 let mut terminal_orders: Vec<(u32, u32, String)> = Vec::new();
271
272 let mut pending_order_reports = Vec::new();
276
277 if let Some(ref orders) = data.contents.orders {
278 for ws_order in orders {
279 if let Ok(client_id_u32) = ws_order.client_id.parse::<u32>() {
281 let client_meta = ws_order.client_metadata
282 .as_ref()
283 .and_then(|s| s.parse::<u32>().ok())
284 .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
285 order_id_map.insert(ws_order.id.clone(), (client_id_u32, client_meta));
286 }
287
288 match parse_ws_order_report(
289 ws_order,
290 instrument_cache,
291 &order_contexts,
292 encoder,
293 account_id,
294 ts_init,
295 ) {
296 Ok(report) => {
297 if !report.order_status.is_open()
298 && let Ok(cid) = ws_order.client_id.parse::<u32>()
299 {
300 let meta = ws_order.client_metadata
301 .as_ref()
302 .and_then(|s| s.parse::<u32>().ok())
303 .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
304 terminal_orders.push((cid, meta, ws_order.id.clone()));
305 }
306 pending_order_reports.push(report);
307 }
308 Err(e) => log::error!("Failed to parse WS order: {e}"),
309 }
310 }
311 }
312
313 if let Some(ref fills) = data.contents.fills {
316 for ws_fill in fills {
317 match parse_ws_fill_report(
318 ws_fill,
319 instrument_cache,
320 &order_id_map,
321 &order_contexts,
322 encoder,
323 account_id,
324 ts_init,
325 ) {
326 Ok(report) => {
327 Python::attach(|py| {
328 match pyo3::Py::new(py, report) {
329 Ok(py_obj) => {
330 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
331 }
332 Err(e) => log::error!("Failed to convert FillReport: {e}"),
333 }
334 });
335 }
336 Err(e) => log::error!("Failed to parse WS fill: {e}"),
337 }
338 }
339 }
340
341 for report in pending_order_reports {
343 Python::attach(|py| {
344 match pyo3::Py::new(py, report) {
345 Ok(py_obj) => {
346 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
347 }
348 Err(e) => log::error!("Failed to convert OrderStatusReport: {e}"),
349 }
350 });
351 }
352
353 for (client_id, client_metadata, order_id) in terminal_orders {
355 order_contexts.remove(&client_id);
356 encoder.remove(client_id, client_metadata);
357 order_id_map.remove(&order_id);
358 }
359 }
360 NautilusWsMessage::MarkPrice(mark_price) => {
361 Python::attach(|py| {
362 match mark_price.into_py_any(py) {
363 Ok(py_obj) => {
364 call_python_threadsafe(py, &call_soon, &callback, py_obj);
365 }
366 Err(e) => log::error!("Failed to convert MarkPriceUpdate to Python: {e}"),
367 }
368 });
369 }
370 NautilusWsMessage::IndexPrice(index_price) => {
371 Python::attach(|py| {
372 match index_price.into_py_any(py) {
373 Ok(py_obj) => {
374 call_python_threadsafe(py, &call_soon, &callback, py_obj);
375 }
376 Err(e) => log::error!("Failed to convert IndexPriceUpdate to Python: {e}"),
377 }
378 });
379 }
380 NautilusWsMessage::FundingRate(funding_rate) => {
381 Python::attach(|py| {
382 match funding_rate.into_py_any(py) {
383 Ok(py_obj) => {
384 call_python_threadsafe(py, &call_soon, &callback, py_obj);
385 }
386 Err(e) => log::error!("Failed to convert FundingRateUpdate to Python: {e}"),
387 }
388 });
389 }
390 NautilusWsMessage::InstrumentStatus(status) => {
391 Python::attach(|py| {
392 match status.into_py_any(py) {
393 Ok(py_obj) => {
394 call_python_threadsafe(py, &call_soon, &callback, py_obj);
395 }
396 Err(e) => log::error!("Failed to convert InstrumentStatus to Python: {e}"),
397 }
398 });
399 }
400 NautilusWsMessage::Error(err) => {
401 log::error!("dYdX WebSocket error: {err}");
402 }
403 NautilusWsMessage::Reconnected => {
404 log::info!("dYdX WebSocket reconnected");
405 }
406 NautilusWsMessage::AccountState(state) => {
407 Python::attach(|py| {
408 match state.into_py_any(py) {
409 Ok(py_obj) => {
410 call_python_threadsafe(py, &call_soon, &callback, py_obj);
411 }
412 Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
413 }
414 });
415 }
416 NautilusWsMessage::Position(report) => {
417 Python::attach(|py| {
418 match pyo3::Py::new(py, *report) {
419 Ok(py_obj) => {
420 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
421 }
422 Err(e) => log::error!("Failed to convert PositionStatusReport to Python: {e}"),
423 }
424 });
425 }
426 NautilusWsMessage::Order(report) => {
427 Python::attach(|py| {
428 match pyo3::Py::new(py, *report) {
429 Ok(py_obj) => {
430 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
431 }
432 Err(e) => log::error!("Failed to convert OrderStatusReport to Python: {e}"),
433 }
434 });
435 }
436 NautilusWsMessage::Fill(report) => {
437 Python::attach(|py| {
438 match pyo3::Py::new(py, *report) {
439 Ok(py_obj) => {
440 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
441 }
442 Err(e) => log::error!("Failed to convert FillReport to Python: {e}"),
443 }
444 });
445 }
446 NautilusWsMessage::NewInstrumentDiscovered { ticker } => {
447 log::info!("New instrument discovered via WebSocket: {ticker}");
448 Python::attach(|py| {
449 let dict = PyDict::new(py);
450 let _ = dict.set_item("type", "new_instrument_discovered");
451 let _ = dict.set_item("ticker", &ticker);
452 if let Ok(py_obj) = dict.into_py_any(py) {
453 call_python_threadsafe(py, &call_soon, &callback, py_obj);
454 }
455 });
456 }
457 }
458 }
459 });
460 }
461
462 Ok(())
463 })
464 }
465
466 #[pyo3(name = "disconnect")]
467 fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
468 let mut client = self.clone();
469 pyo3_async_runtimes::tokio::future_into_py(py, async move {
470 client.disconnect().await.map_err(to_pyvalue_err)?;
471 Ok(())
472 })
473 }
474
475 #[pyo3(name = "wait_until_active")]
476 fn py_wait_until_active<'py>(
477 &self,
478 py: Python<'py>,
479 timeout_secs: f64,
480 ) -> PyResult<Bound<'py, PyAny>> {
481 let connection_mode = self.connection_mode_atomic();
482
483 pyo3_async_runtimes::tokio::future_into_py(py, async move {
484 let timeout = Duration::from_secs_f64(timeout_secs);
485 let start = Instant::now();
486
487 loop {
488 let mode = connection_mode.load();
489 let mode_u8 = mode.load(Ordering::Relaxed);
490 let is_connected = matches!(
491 mode_u8,
492 x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
493 );
494
495 if is_connected {
496 break;
497 }
498
499 if start.elapsed() > timeout {
500 return Err(to_pyvalue_err(std::io::Error::new(
501 std::io::ErrorKind::TimedOut,
502 format!("Client did not become active within {timeout_secs}s"),
503 )));
504 }
505 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
506 }
507
508 Ok(())
509 })
510 }
511
512 #[pyo3(name = "cache_instrument")]
513 fn py_cache_instrument(&self, instrument: Py<PyAny>, py: Python<'_>) -> PyResult<()> {
514 let inst_any = pyobject_to_instrument_any(py, instrument)?;
515 self.cache_instrument(inst_any);
516 Ok(())
517 }
518
519 #[pyo3(name = "cache_instruments")]
520 fn py_cache_instruments(&self, instruments: Vec<Py<PyAny>>, py: Python<'_>) -> PyResult<()> {
521 let mut instruments_any = Vec::new();
522 for inst in instruments {
523 let inst_any = pyobject_to_instrument_any(py, inst)?;
524 instruments_any.push(inst_any);
525 }
526 self.cache_instruments(instruments_any);
527 Ok(())
528 }
529
530 #[pyo3(name = "is_closed")]
531 fn py_is_closed(&self) -> bool {
532 !self.is_connected()
533 }
534
535 #[pyo3(name = "subscribe_trades")]
536 fn py_subscribe_trades<'py>(
537 &self,
538 py: Python<'py>,
539 instrument_id: InstrumentId,
540 ) -> PyResult<Bound<'py, PyAny>> {
541 let client = self.clone();
542 pyo3_async_runtimes::tokio::future_into_py(py, async move {
543 client
544 .subscribe_trades(instrument_id)
545 .await
546 .map_err(to_pyvalue_err)?;
547 Ok(())
548 })
549 }
550
551 #[pyo3(name = "unsubscribe_trades")]
552 fn py_unsubscribe_trades<'py>(
553 &self,
554 py: Python<'py>,
555 instrument_id: InstrumentId,
556 ) -> PyResult<Bound<'py, PyAny>> {
557 let client = self.clone();
558 pyo3_async_runtimes::tokio::future_into_py(py, async move {
559 client
560 .unsubscribe_trades(instrument_id)
561 .await
562 .map_err(to_pyvalue_err)?;
563 Ok(())
564 })
565 }
566
567 #[pyo3(name = "subscribe_orderbook")]
568 fn py_subscribe_orderbook<'py>(
569 &self,
570 py: Python<'py>,
571 instrument_id: InstrumentId,
572 ) -> PyResult<Bound<'py, PyAny>> {
573 let client = self.clone();
574 pyo3_async_runtimes::tokio::future_into_py(py, async move {
575 client
576 .subscribe_orderbook(instrument_id)
577 .await
578 .map_err(to_pyvalue_err)?;
579 Ok(())
580 })
581 }
582
583 #[pyo3(name = "unsubscribe_orderbook")]
584 fn py_unsubscribe_orderbook<'py>(
585 &self,
586 py: Python<'py>,
587 instrument_id: InstrumentId,
588 ) -> PyResult<Bound<'py, PyAny>> {
589 let client = self.clone();
590 pyo3_async_runtimes::tokio::future_into_py(py, async move {
591 client
592 .unsubscribe_orderbook(instrument_id)
593 .await
594 .map_err(to_pyvalue_err)?;
595 Ok(())
596 })
597 }
598
599 #[pyo3(name = "subscribe_bars")]
600 fn py_subscribe_bars<'py>(
601 &self,
602 py: Python<'py>,
603 bar_type: BarType,
604 ) -> PyResult<Bound<'py, PyAny>> {
605 let spec = bar_type.spec();
606 let resolution = DydxCandleResolution::from_bar_spec(&spec).map_err(to_pyvalue_err)?;
607 let resolution = resolution.to_string();
608
609 let client = self.clone();
610 let instrument_id = bar_type.instrument_id();
611
612 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
614 let topic = format!("{ticker}/{resolution}");
615
616 pyo3_async_runtimes::tokio::future_into_py(py, async move {
617 client
619 .send_command(HandlerCommand::RegisterBarType { topic, bar_type })
620 .map_err(to_pyvalue_err)?;
621
622 tokio::time::sleep(Duration::from_millis(50)).await;
624
625 client
626 .subscribe_candles(instrument_id, &resolution)
627 .await
628 .map_err(to_pyvalue_err)?;
629 Ok(())
630 })
631 }
632
633 #[pyo3(name = "unsubscribe_bars")]
634 fn py_unsubscribe_bars<'py>(
635 &self,
636 py: Python<'py>,
637 bar_type: BarType,
638 ) -> PyResult<Bound<'py, PyAny>> {
639 let spec = bar_type.spec();
640 let resolution = DydxCandleResolution::from_bar_spec(&spec).map_err(to_pyvalue_err)?;
641 let resolution = resolution.to_string();
642
643 let client = self.clone();
644 let instrument_id = bar_type.instrument_id();
645
646 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
648 let topic = format!("{ticker}/{resolution}");
649
650 pyo3_async_runtimes::tokio::future_into_py(py, async move {
651 client
652 .unsubscribe_candles(instrument_id, &resolution)
653 .await
654 .map_err(to_pyvalue_err)?;
655
656 client
658 .send_command(HandlerCommand::UnregisterBarType { topic })
659 .map_err(to_pyvalue_err)?;
660
661 Ok(())
662 })
663 }
664
665 #[pyo3(name = "subscribe_markets")]
666 fn py_subscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
667 let client = self.clone();
668 pyo3_async_runtimes::tokio::future_into_py(py, async move {
669 client.subscribe_markets().await.map_err(to_pyvalue_err)?;
670 Ok(())
671 })
672 }
673
674 #[pyo3(name = "unsubscribe_markets")]
675 fn py_unsubscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
676 let client = self.clone();
677 pyo3_async_runtimes::tokio::future_into_py(py, async move {
678 client.unsubscribe_markets().await.map_err(to_pyvalue_err)?;
679 Ok(())
680 })
681 }
682
683 #[pyo3(name = "subscribe_subaccount")]
684 fn py_subscribe_subaccount<'py>(
685 &self,
686 py: Python<'py>,
687 address: String,
688 subaccount_number: u32,
689 ) -> PyResult<Bound<'py, PyAny>> {
690 let client = self.clone();
691 pyo3_async_runtimes::tokio::future_into_py(py, async move {
692 client
693 .subscribe_subaccount(&address, subaccount_number)
694 .await
695 .map_err(to_pyvalue_err)?;
696 Ok(())
697 })
698 }
699
700 #[pyo3(name = "unsubscribe_subaccount")]
701 fn py_unsubscribe_subaccount<'py>(
702 &self,
703 py: Python<'py>,
704 address: String,
705 subaccount_number: u32,
706 ) -> PyResult<Bound<'py, PyAny>> {
707 let client = self.clone();
708 pyo3_async_runtimes::tokio::future_into_py(py, async move {
709 client
710 .unsubscribe_subaccount(&address, subaccount_number)
711 .await
712 .map_err(to_pyvalue_err)?;
713 Ok(())
714 })
715 }
716
717 #[pyo3(name = "subscribe_block_height")]
718 fn py_subscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
719 let client = self.clone();
720 pyo3_async_runtimes::tokio::future_into_py(py, async move {
721 client
722 .subscribe_block_height()
723 .await
724 .map_err(to_pyvalue_err)?;
725 Ok(())
726 })
727 }
728
729 #[pyo3(name = "unsubscribe_block_height")]
730 fn py_unsubscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
731 let client = self.clone();
732 pyo3_async_runtimes::tokio::future_into_py(py, async move {
733 client
734 .unsubscribe_block_height()
735 .await
736 .map_err(to_pyvalue_err)?;
737 Ok(())
738 })
739 }
740}