1use futures_util::StreamExt;
45use nautilus_common::live::get_runtime;
46use nautilus_core::python::{call_python_threadsafe, to_pyruntime_err, to_pyvalue_err};
47use nautilus_model::{
48 data::bar::BarType,
49 identifiers::{AccountId, InstrumentId},
50 python::{
51 data::data_to_pycapsule,
52 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
53 },
54};
55use pyo3::{conversion::IntoPyObjectExt, prelude::*};
56
57use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
58
59#[pymethods]
60impl BitmexWebSocketClient {
61 #[new]
62 #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None, testnet=false))]
63 fn py_new(
64 url: Option<String>,
65 api_key: Option<String>,
66 api_secret: Option<String>,
67 account_id: Option<AccountId>,
68 heartbeat: Option<u64>,
69 testnet: bool,
70 ) -> PyResult<Self> {
71 Self::new_with_env(url, api_key, api_secret, account_id, heartbeat, testnet)
72 .map_err(to_pyvalue_err)
73 }
74
75 #[staticmethod]
76 #[pyo3(name = "from_env")]
77 fn py_from_env() -> PyResult<Self> {
78 Self::from_env().map_err(to_pyvalue_err)
79 }
80
81 #[getter]
82 #[pyo3(name = "url")]
83 #[must_use]
84 pub const fn py_url(&self) -> &str {
85 self.url()
86 }
87
88 #[getter]
89 #[pyo3(name = "api_key")]
90 #[must_use]
91 pub fn py_api_key(&self) -> Option<&str> {
92 self.api_key()
93 }
94
95 #[getter]
96 #[pyo3(name = "api_key_masked")]
97 #[must_use]
98 pub fn py_api_key_masked(&self) -> Option<String> {
99 self.api_key_masked()
100 }
101
102 #[pyo3(name = "is_active")]
103 fn py_is_active(&mut self) -> bool {
104 self.is_active()
105 }
106
107 #[pyo3(name = "is_closed")]
108 fn py_is_closed(&mut self) -> bool {
109 self.is_closed()
110 }
111
112 #[pyo3(name = "get_subscriptions")]
113 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
114 self.get_subscriptions(instrument_id)
115 }
116
117 #[pyo3(name = "set_account_id")]
118 pub fn py_set_account_id(&mut self, account_id: AccountId) {
119 self.set_account_id(account_id);
120 }
121
122 #[pyo3(name = "cache_instrument")]
123 fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
124 let inst_any = pyobject_to_instrument_any(py, instrument)?;
125 self.cache_instrument(inst_any);
126 Ok(())
127 }
128
129 #[pyo3(name = "connect")]
130 fn py_connect<'py>(
131 &mut self,
132 py: Python<'py>,
133 loop_: Py<PyAny>,
134 instruments: Vec<Py<PyAny>>,
135 callback: Py<PyAny>,
136 ) -> PyResult<Bound<'py, PyAny>> {
137 let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
138
139 let mut instruments_any = Vec::new();
140 for inst in instruments {
141 let inst_any = pyobject_to_instrument_any(py, inst)?;
142 instruments_any.push(inst_any);
143 }
144
145 self.cache_instruments(instruments_any);
146
147 let mut client = self.clone();
150
151 pyo3_async_runtimes::tokio::future_into_py(py, async move {
152 client.connect().await.map_err(to_pyruntime_err)?;
153
154 let stream = client.stream();
155
156 get_runtime().spawn(async move {
157 let _client = client; tokio::pin!(stream);
159
160 while let Some(msg) = stream.next().await {
161 Python::attach(|py| match msg {
162 NautilusWsMessage::Data(data_vec) => {
163 for data in data_vec {
164 let py_obj = data_to_pycapsule(py, data);
165 call_python_threadsafe(py, &call_soon, &callback, py_obj);
166 }
167 }
168 NautilusWsMessage::Instruments(instruments) => {
169 for instrument in instruments {
170 if let Ok(py_obj) = instrument_any_to_pyobject(py, instrument) {
171 call_python_threadsafe(py, &call_soon, &callback, py_obj);
172 }
173 }
174 }
175 NautilusWsMessage::OrderStatusReports(reports) => {
176 for report in reports {
177 if let Ok(py_obj) = report.into_py_any(py) {
178 call_python_threadsafe(py, &call_soon, &callback, py_obj);
179 }
180 }
181 }
182 NautilusWsMessage::FillReports(reports) => {
183 for report in reports {
184 if let Ok(py_obj) = report.into_py_any(py) {
185 call_python_threadsafe(py, &call_soon, &callback, py_obj);
186 }
187 }
188 }
189 NautilusWsMessage::PositionStatusReports(reports) => {
190 for report in reports {
191 if let Ok(py_obj) = report.into_py_any(py) {
192 call_python_threadsafe(py, &call_soon, &callback, py_obj);
193 }
194 }
195 }
196 NautilusWsMessage::FundingRateUpdates(updates) => {
197 for update in updates {
198 if let Ok(py_obj) = update.into_py_any(py) {
199 call_python_threadsafe(py, &call_soon, &callback, py_obj);
200 }
201 }
202 }
203 NautilusWsMessage::AccountStates(states) => {
204 for state in states {
205 if let Ok(py_obj) = state.into_py_any(py) {
206 call_python_threadsafe(py, &call_soon, &callback, py_obj);
207 }
208 }
209 }
210 NautilusWsMessage::OrderUpdated(event) => {
211 if let Ok(py_obj) = (*event).into_py_any(py) {
212 call_python_threadsafe(py, &call_soon, &callback, py_obj);
213 }
214 }
215 NautilusWsMessage::OrderUpdates(events) => {
216 for event in events {
217 if let Ok(py_obj) = event.into_py_any(py) {
218 call_python_threadsafe(py, &call_soon, &callback, py_obj);
219 }
220 }
221 }
222 NautilusWsMessage::InstrumentStatus(status) => {
223 if let Ok(py_obj) = status.into_py_any(py) {
224 call_python_threadsafe(py, &call_soon, &callback, py_obj);
225 }
226 }
227 NautilusWsMessage::Reconnected => {}
228 NautilusWsMessage::Authenticated => {}
229 });
230 }
231 });
232
233 Ok(())
234 })
235 }
236
237 #[pyo3(name = "wait_until_active")]
238 fn py_wait_until_active<'py>(
239 &self,
240 py: Python<'py>,
241 timeout_secs: f64,
242 ) -> PyResult<Bound<'py, PyAny>> {
243 let client = self.clone();
244
245 pyo3_async_runtimes::tokio::future_into_py(py, async move {
246 client
247 .wait_until_active(timeout_secs)
248 .await
249 .map_err(to_pyruntime_err)?;
250 Ok(())
251 })
252 }
253
254 #[pyo3(name = "close")]
255 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
256 let mut client = self.clone();
257
258 pyo3_async_runtimes::tokio::future_into_py(py, async move {
259 if let Err(e) = client.close().await {
260 log::error!("Error on close: {e}");
261 }
262 Ok(())
263 })
264 }
265
266 #[pyo3(name = "subscribe_instruments")]
267 fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
268 let client = self.clone();
269
270 pyo3_async_runtimes::tokio::future_into_py(py, async move {
271 if let Err(e) = client.subscribe_instruments().await {
272 log::error!("Failed to subscribe to instruments: {e}");
273 }
274 Ok(())
275 })
276 }
277
278 #[pyo3(name = "subscribe_instrument")]
279 fn py_subscribe_instrument<'py>(
280 &self,
281 py: Python<'py>,
282 instrument_id: InstrumentId,
283 ) -> PyResult<Bound<'py, PyAny>> {
284 let client = self.clone();
285
286 pyo3_async_runtimes::tokio::future_into_py(py, async move {
287 if let Err(e) = client.subscribe_instrument(instrument_id).await {
288 log::error!("Failed to subscribe to instrument: {e}");
289 }
290 Ok(())
291 })
292 }
293
294 #[pyo3(name = "subscribe_book")]
295 fn py_subscribe_book<'py>(
296 &self,
297 py: Python<'py>,
298 instrument_id: InstrumentId,
299 ) -> PyResult<Bound<'py, PyAny>> {
300 let client = self.clone();
301
302 pyo3_async_runtimes::tokio::future_into_py(py, async move {
303 if let Err(e) = client.subscribe_book(instrument_id).await {
304 log::error!("Failed to subscribe to order book: {e}");
305 }
306 Ok(())
307 })
308 }
309
310 #[pyo3(name = "subscribe_book_25")]
311 fn py_subscribe_book_25<'py>(
312 &self,
313 py: Python<'py>,
314 instrument_id: InstrumentId,
315 ) -> PyResult<Bound<'py, PyAny>> {
316 let client = self.clone();
317
318 pyo3_async_runtimes::tokio::future_into_py(py, async move {
319 if let Err(e) = client.subscribe_book_25(instrument_id).await {
320 log::error!("Failed to subscribe to order book 25: {e}");
321 }
322 Ok(())
323 })
324 }
325
326 #[pyo3(name = "subscribe_book_depth10")]
327 fn py_subscribe_book_depth10<'py>(
328 &self,
329 py: Python<'py>,
330 instrument_id: InstrumentId,
331 ) -> PyResult<Bound<'py, PyAny>> {
332 let client = self.clone();
333
334 pyo3_async_runtimes::tokio::future_into_py(py, async move {
335 if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
336 log::error!("Failed to subscribe to order book depth 10: {e}");
337 }
338 Ok(())
339 })
340 }
341
342 #[pyo3(name = "subscribe_quotes")]
343 fn py_subscribe_quotes<'py>(
344 &self,
345 py: Python<'py>,
346 instrument_id: InstrumentId,
347 ) -> PyResult<Bound<'py, PyAny>> {
348 let client = self.clone();
349
350 pyo3_async_runtimes::tokio::future_into_py(py, async move {
351 if let Err(e) = client.subscribe_quotes(instrument_id).await {
352 log::error!("Failed to subscribe to quotes: {e}");
353 }
354 Ok(())
355 })
356 }
357
358 #[pyo3(name = "subscribe_trades")]
359 fn py_subscribe_trades<'py>(
360 &self,
361 py: Python<'py>,
362 instrument_id: InstrumentId,
363 ) -> PyResult<Bound<'py, PyAny>> {
364 let client = self.clone();
365
366 pyo3_async_runtimes::tokio::future_into_py(py, async move {
367 if let Err(e) = client.subscribe_trades(instrument_id).await {
368 log::error!("Failed to subscribe to trades: {e}");
369 }
370 Ok(())
371 })
372 }
373
374 #[pyo3(name = "subscribe_mark_prices")]
375 fn py_subscribe_mark_prices<'py>(
376 &self,
377 py: Python<'py>,
378 instrument_id: InstrumentId,
379 ) -> PyResult<Bound<'py, PyAny>> {
380 let client = self.clone();
381
382 pyo3_async_runtimes::tokio::future_into_py(py, async move {
383 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
384 log::error!("Failed to subscribe to mark prices: {e}");
385 }
386 Ok(())
387 })
388 }
389
390 #[pyo3(name = "subscribe_index_prices")]
391 fn py_subscribe_index_prices<'py>(
392 &self,
393 py: Python<'py>,
394 instrument_id: InstrumentId,
395 ) -> PyResult<Bound<'py, PyAny>> {
396 let client = self.clone();
397
398 pyo3_async_runtimes::tokio::future_into_py(py, async move {
399 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
400 log::error!("Failed to subscribe to index prices: {e}");
401 }
402 Ok(())
403 })
404 }
405
406 #[pyo3(name = "subscribe_funding_rates")]
407 fn py_subscribe_funding_rates<'py>(
408 &self,
409 py: Python<'py>,
410 instrument_id: InstrumentId,
411 ) -> PyResult<Bound<'py, PyAny>> {
412 let client = self.clone();
413
414 pyo3_async_runtimes::tokio::future_into_py(py, async move {
415 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
416 log::error!("Failed to subscribe to funding: {e}");
417 }
418 Ok(())
419 })
420 }
421
422 #[pyo3(name = "subscribe_bars")]
423 fn py_subscribe_bars<'py>(
424 &self,
425 py: Python<'py>,
426 bar_type: BarType,
427 ) -> PyResult<Bound<'py, PyAny>> {
428 let client = self.clone();
429
430 pyo3_async_runtimes::tokio::future_into_py(py, async move {
431 if let Err(e) = client.subscribe_bars(bar_type).await {
432 log::error!("Failed to subscribe to bars: {e}");
433 }
434 Ok(())
435 })
436 }
437
438 #[pyo3(name = "unsubscribe_instruments")]
439 fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
440 let client = self.clone();
441
442 pyo3_async_runtimes::tokio::future_into_py(py, async move {
443 if let Err(e) = client.unsubscribe_instruments().await {
444 log::error!("Failed to unsubscribe from instruments: {e}");
445 }
446 Ok(())
447 })
448 }
449
450 #[pyo3(name = "unsubscribe_instrument")]
451 fn py_unsubscribe_instrument<'py>(
452 &self,
453 py: Python<'py>,
454 instrument_id: InstrumentId,
455 ) -> PyResult<Bound<'py, PyAny>> {
456 let client = self.clone();
457
458 pyo3_async_runtimes::tokio::future_into_py(py, async move {
459 if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
460 log::error!("Failed to unsubscribe from instrument: {e}");
461 }
462 Ok(())
463 })
464 }
465
466 #[pyo3(name = "unsubscribe_book")]
467 fn py_unsubscribe_book<'py>(
468 &self,
469 py: Python<'py>,
470 instrument_id: InstrumentId,
471 ) -> PyResult<Bound<'py, PyAny>> {
472 let client = self.clone();
473
474 pyo3_async_runtimes::tokio::future_into_py(py, async move {
475 if let Err(e) = client.unsubscribe_book(instrument_id).await {
476 log::error!("Failed to unsubscribe from order book: {e}");
477 }
478 Ok(())
479 })
480 }
481
482 #[pyo3(name = "unsubscribe_book_25")]
483 fn py_unsubscribe_book_25<'py>(
484 &self,
485 py: Python<'py>,
486 instrument_id: InstrumentId,
487 ) -> PyResult<Bound<'py, PyAny>> {
488 let client = self.clone();
489
490 pyo3_async_runtimes::tokio::future_into_py(py, async move {
491 if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
492 log::error!("Failed to unsubscribe from order book 25: {e}");
493 }
494 Ok(())
495 })
496 }
497
498 #[pyo3(name = "unsubscribe_book_depth10")]
499 fn py_unsubscribe_book_depth10<'py>(
500 &self,
501 py: Python<'py>,
502 instrument_id: InstrumentId,
503 ) -> PyResult<Bound<'py, PyAny>> {
504 let client = self.clone();
505
506 pyo3_async_runtimes::tokio::future_into_py(py, async move {
507 if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
508 log::error!("Failed to unsubscribe from order book depth 10: {e}");
509 }
510 Ok(())
511 })
512 }
513
514 #[pyo3(name = "unsubscribe_quotes")]
515 fn py_unsubscribe_quotes<'py>(
516 &self,
517 py: Python<'py>,
518 instrument_id: InstrumentId,
519 ) -> PyResult<Bound<'py, PyAny>> {
520 let client = self.clone();
521
522 pyo3_async_runtimes::tokio::future_into_py(py, async move {
523 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
524 log::error!("Failed to unsubscribe from quotes: {e}");
525 }
526 Ok(())
527 })
528 }
529
530 #[pyo3(name = "unsubscribe_trades")]
531 fn py_unsubscribe_trades<'py>(
532 &self,
533 py: Python<'py>,
534 instrument_id: InstrumentId,
535 ) -> PyResult<Bound<'py, PyAny>> {
536 let client = self.clone();
537
538 pyo3_async_runtimes::tokio::future_into_py(py, async move {
539 if let Err(e) = client.unsubscribe_trades(instrument_id).await {
540 log::error!("Failed to unsubscribe from trades: {e}");
541 }
542 Ok(())
543 })
544 }
545
546 #[pyo3(name = "unsubscribe_mark_prices")]
547 fn py_unsubscribe_mark_prices<'py>(
548 &self,
549 py: Python<'py>,
550 instrument_id: InstrumentId,
551 ) -> PyResult<Bound<'py, PyAny>> {
552 let client = self.clone();
553
554 pyo3_async_runtimes::tokio::future_into_py(py, async move {
555 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
556 log::error!("Failed to unsubscribe from mark prices: {e}");
557 }
558 Ok(())
559 })
560 }
561
562 #[pyo3(name = "unsubscribe_index_prices")]
563 fn py_unsubscribe_index_prices<'py>(
564 &self,
565 py: Python<'py>,
566 instrument_id: InstrumentId,
567 ) -> PyResult<Bound<'py, PyAny>> {
568 let client = self.clone();
569
570 pyo3_async_runtimes::tokio::future_into_py(py, async move {
571 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
572 log::error!("Failed to unsubscribe from index prices: {e}");
573 }
574 Ok(())
575 })
576 }
577
578 #[pyo3(name = "unsubscribe_funding_rates")]
579 fn py_unsubscribe_funding_rates<'py>(
580 &self,
581 py: Python<'py>,
582 instrument_id: InstrumentId,
583 ) -> PyResult<Bound<'py, PyAny>> {
584 let client = self.clone();
585 pyo3_async_runtimes::tokio::future_into_py(py, async move {
586 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
587 log::error!("Failed to unsubscribe from funding rates: {e}");
588 }
589 Ok(())
590 })
591 }
592
593 #[pyo3(name = "unsubscribe_bars")]
594 fn py_unsubscribe_bars<'py>(
595 &self,
596 py: Python<'py>,
597 bar_type: BarType,
598 ) -> PyResult<Bound<'py, PyAny>> {
599 let client = self.clone();
600
601 pyo3_async_runtimes::tokio::future_into_py(py, async move {
602 if let Err(e) = client.unsubscribe_bars(bar_type).await {
603 log::error!("Failed to unsubscribe from bars: {e}");
604 }
605 Ok(())
606 })
607 }
608
609 #[pyo3(name = "subscribe_orders")]
610 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
611 let client = self.clone();
612
613 pyo3_async_runtimes::tokio::future_into_py(py, async move {
614 if let Err(e) = client.subscribe_orders().await {
615 log::error!("Failed to subscribe to orders: {e}");
616 }
617 Ok(())
618 })
619 }
620
621 #[pyo3(name = "subscribe_executions")]
622 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
623 let client = self.clone();
624
625 pyo3_async_runtimes::tokio::future_into_py(py, async move {
626 if let Err(e) = client.subscribe_executions().await {
627 log::error!("Failed to subscribe to executions: {e}");
628 }
629 Ok(())
630 })
631 }
632
633 #[pyo3(name = "subscribe_positions")]
634 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
635 let client = self.clone();
636
637 pyo3_async_runtimes::tokio::future_into_py(py, async move {
638 if let Err(e) = client.subscribe_positions().await {
639 log::error!("Failed to subscribe to positions: {e}");
640 }
641 Ok(())
642 })
643 }
644
645 #[pyo3(name = "subscribe_margin")]
646 fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
647 let client = self.clone();
648
649 pyo3_async_runtimes::tokio::future_into_py(py, async move {
650 if let Err(e) = client.subscribe_margin().await {
651 log::error!("Failed to subscribe to margin: {e}");
652 }
653 Ok(())
654 })
655 }
656
657 #[pyo3(name = "subscribe_wallet")]
658 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
659 let client = self.clone();
660
661 pyo3_async_runtimes::tokio::future_into_py(py, async move {
662 if let Err(e) = client.subscribe_wallet().await {
663 log::error!("Failed to subscribe to wallet: {e}");
664 }
665 Ok(())
666 })
667 }
668
669 #[pyo3(name = "unsubscribe_orders")]
670 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
671 let client = self.clone();
672
673 pyo3_async_runtimes::tokio::future_into_py(py, async move {
674 if let Err(e) = client.unsubscribe_orders().await {
675 log::error!("Failed to unsubscribe from orders: {e}");
676 }
677 Ok(())
678 })
679 }
680
681 #[pyo3(name = "unsubscribe_executions")]
682 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
683 let client = self.clone();
684
685 pyo3_async_runtimes::tokio::future_into_py(py, async move {
686 if let Err(e) = client.unsubscribe_executions().await {
687 log::error!("Failed to unsubscribe from executions: {e}");
688 }
689 Ok(())
690 })
691 }
692
693 #[pyo3(name = "unsubscribe_positions")]
694 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
695 let client = self.clone();
696
697 pyo3_async_runtimes::tokio::future_into_py(py, async move {
698 if let Err(e) = client.unsubscribe_positions().await {
699 log::error!("Failed to unsubscribe from positions: {e}");
700 }
701 Ok(())
702 })
703 }
704
705 #[pyo3(name = "unsubscribe_margin")]
706 fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
707 let client = self.clone();
708
709 pyo3_async_runtimes::tokio::future_into_py(py, async move {
710 if let Err(e) = client.unsubscribe_margin().await {
711 log::error!("Failed to unsubscribe from margin: {e}");
712 }
713 Ok(())
714 })
715 }
716
717 #[pyo3(name = "unsubscribe_wallet")]
718 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
719 let client = self.clone();
720
721 pyo3_async_runtimes::tokio::future_into_py(py, async move {
722 if let Err(e) = client.unsubscribe_wallet().await {
723 log::error!("Failed to unsubscribe from wallet: {e}");
724 }
725 Ok(())
726 })
727 }
728}