nautilus-bitmex 0.55.0

BitMEX exchange integration adapter for the Nautilus trading engine
Documentation
// -------------------------------------------------------------------------------------------------
//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
//  https://nautechsystems.io
//
//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
//  You may not use this file except in compliance with the License.
//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.
// -------------------------------------------------------------------------------------------------

//! Python bindings for the BitMEX cancel broadcaster.

use nautilus_core::python::to_pyvalue_err;
use nautilus_model::{
    enums::OrderSide,
    identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
    python::instruments::pyobject_to_instrument_any,
};
use pyo3::{conversion::IntoPyObjectExt, prelude::*, types::PyDict};

use crate::broadcast::canceller::{CancelBroadcaster, CancelBroadcasterConfig};

#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods]
impl CancelBroadcaster {
    /// Broadcasts cancel requests to multiple HTTP clients for redundancy.
    ///
    /// This broadcaster fans out cancel requests to multiple pre-warmed HTTP clients
    /// in parallel, short-circuits when the first successful acknowledgement is received,
    /// and handles expected rejection patterns with appropriate log levels.
    #[new]
    #[pyo3(signature = (
        pool_size,
        api_key=None,
        api_secret=None,
        base_url=None,
        testnet=false,
        timeout_secs=60,
        max_retries=3,
        retry_delay_ms=1_000,
        retry_delay_max_ms=5_000,
        recv_window_ms=10_000,
        max_requests_per_second=10,
        max_requests_per_minute=120,
        health_check_interval_secs=30,
        health_check_timeout_secs=5,
        expected_reject_patterns=None,
        idempotent_success_patterns=None,
        proxy_urls=None
    ))]
    #[allow(clippy::too_many_arguments)]
    fn py_new(
        pool_size: usize,
        api_key: Option<String>,
        api_secret: Option<String>,
        base_url: Option<String>,
        testnet: bool,
        timeout_secs: u64,
        max_retries: u32,
        retry_delay_ms: u64,
        retry_delay_max_ms: u64,
        recv_window_ms: u64,
        max_requests_per_second: u32,
        max_requests_per_minute: u32,
        health_check_interval_secs: u64,
        health_check_timeout_secs: u64,
        expected_reject_patterns: Option<Vec<String>>,
        idempotent_success_patterns: Option<Vec<String>>,
        proxy_urls: Option<Vec<Option<String>>>,
    ) -> PyResult<Self> {
        let config = CancelBroadcasterConfig {
            pool_size,
            api_key,
            api_secret,
            base_url,
            testnet,
            timeout_secs,
            max_retries,
            retry_delay_ms,
            retry_delay_max_ms,
            recv_window_ms,
            max_requests_per_second,
            max_requests_per_minute,
            health_check_interval_secs,
            health_check_timeout_secs,
            expected_reject_patterns: expected_reject_patterns
                .unwrap_or_else(|| CancelBroadcasterConfig::default().expected_reject_patterns),
            idempotent_success_patterns: idempotent_success_patterns
                .unwrap_or_else(|| CancelBroadcasterConfig::default().idempotent_success_patterns),
            proxy_urls: proxy_urls.unwrap_or_default(),
        };

        Self::new(config).map_err(to_pyvalue_err)
    }

    /// Caches an instrument in all HTTP clients in the pool.
    #[pyo3(name = "cache_instrument")]
    fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
        let inst_any = pyobject_to_instrument_any(py, instrument)?;
        self.cache_instrument(&inst_any);
        Ok(())
    }

    /// Starts the broadcaster and health check loop.
    ///
    /// # Errors
    ///
    /// Returns an error if the broadcaster is already running.
    #[pyo3(name = "start")]
    fn py_start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
        let broadcaster = self.clone_for_async();
        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            broadcaster.start().await.map_err(to_pyvalue_err)
        })
    }

    /// Stops the broadcaster and health check loop.
    #[pyo3(name = "stop")]
    fn py_stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
        let broadcaster = self.clone_for_async();
        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            broadcaster.stop().await;
            Ok(())
        })
    }

    /// Broadcasts a single cancel request to all healthy clients in parallel.
    ///
    /// # Returns
    ///
    /// - `Ok(Some(report))` if successfully cancelled with a report.
    /// - `Ok(None)` if the order was already cancelled (idempotent success).
    /// - `Err` if all requests failed.
    #[pyo3(name = "broadcast_cancel")]
    fn py_broadcast_cancel<'py>(
        &self,
        py: Python<'py>,
        instrument_id: InstrumentId,
        client_order_id: Option<ClientOrderId>,
        venue_order_id: Option<VenueOrderId>,
    ) -> PyResult<Bound<'py, PyAny>> {
        let broadcaster = self.clone_for_async();
        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            let report = broadcaster
                .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
                .await
                .map_err(to_pyvalue_err)?;

            Python::attach(|py| match report {
                Some(r) => r.into_py_any(py),
                None => Ok(py.None()),
            })
        })
    }

    /// Broadcasts a batch cancel request to all healthy clients in parallel.
    #[pyo3(name = "broadcast_batch_cancel")]
    fn py_broadcast_batch_cancel<'py>(
        &self,
        py: Python<'py>,
        instrument_id: InstrumentId,
        client_order_ids: Option<Vec<ClientOrderId>>,
        venue_order_ids: Option<Vec<VenueOrderId>>,
    ) -> PyResult<Bound<'py, PyAny>> {
        let broadcaster = self.clone_for_async();
        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            let reports = broadcaster
                .broadcast_batch_cancel(instrument_id, client_order_ids, venue_order_ids)
                .await
                .map_err(to_pyvalue_err)?;

            Python::attach(|py| {
                let py_reports: PyResult<Vec<_>> = reports
                    .into_iter()
                    .map(|report| report.into_py_any(py))
                    .collect();
                let pylist = pyo3::types::PyList::new(py, py_reports?)
                    .unwrap()
                    .into_any()
                    .unbind();
                Ok(pylist)
            })
        })
    }

    /// Broadcasts a cancel all request to all healthy clients in parallel.
    #[pyo3(name = "broadcast_cancel_all")]
    fn py_broadcast_cancel_all<'py>(
        &self,
        py: Python<'py>,
        instrument_id: InstrumentId,
        order_side: Option<OrderSide>,
    ) -> PyResult<Bound<'py, PyAny>> {
        let broadcaster = self.clone_for_async();
        pyo3_async_runtimes::tokio::future_into_py(py, async move {
            let reports = broadcaster
                .broadcast_cancel_all(instrument_id, order_side)
                .await
                .map_err(to_pyvalue_err)?;

            Python::attach(|py| {
                let py_reports: PyResult<Vec<_>> = reports
                    .into_iter()
                    .map(|report| report.into_py_any(py))
                    .collect();
                let pylist = pyo3::types::PyList::new(py, py_reports?)
                    .unwrap()
                    .into_any()
                    .unbind();
                Ok(pylist)
            })
        })
    }

    /// Gets broadcaster metrics.
    #[pyo3(name = "get_metrics")]
    fn py_get_metrics(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
        let metrics = self.get_metrics();
        let dict = PyDict::new(py);
        dict.set_item("total_cancels", metrics.total_cancels)?;
        dict.set_item("successful_cancels", metrics.successful_cancels)?;
        dict.set_item("failed_cancels", metrics.failed_cancels)?;
        dict.set_item("expected_rejects", metrics.expected_rejects)?;
        dict.set_item("idempotent_successes", metrics.idempotent_successes)?;
        dict.set_item("healthy_clients", metrics.healthy_clients)?;
        dict.set_item("total_clients", metrics.total_clients)?;
        Ok(dict.into())
    }

    /// Gets per-client statistics.
    #[pyo3(name = "get_client_stats")]
    fn py_get_client_stats(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
        let stats = self.get_client_stats();
        let list = pyo3::types::PyList::empty(py);
        for stat in stats {
            let dict = PyDict::new(py);
            dict.set_item("client_id", stat.client_id.clone())?;
            dict.set_item("healthy", stat.healthy)?;
            dict.set_item("cancel_count", stat.cancel_count)?;
            dict.set_item("error_count", stat.error_count)?;
            list.append(dict)?;
        }
        Ok(list.into())
    }
}