binance-sdk 45.0.0

This is a lightweight library that works as a connector to the Binance public API.
/*
 * Binance Margin Trading WebSocket Market Streams
 *
 * OpenAPI Specification for the Binance Margin Trading WebSocket Market Streams
 *
 * The version of the OpenAPI document: 1.0.0
 *
 *
 * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
 * https://openapi-generator.tech
 * Do not edit the class manually.
 */

#![allow(unused_imports)]
use serde_json::Value;
use std::sync::{Arc, atomic::Ordering};
use tokio::spawn;

use crate::common::config::ConfigurationWebsocketStreams;
use crate::common::websocket::{
    Subscription, WebsocketBase, WebsocketStream, WebsocketStreams as WebsocketStreamsBase,
    create_stream_handler,
};
use crate::models::{StreamId, WebsocketEvent, WebsocketMode};

mod apis;
mod handle;
mod models;

pub use apis::*;
pub use handle::*;
pub use models::*;

pub struct WebsocketStreams {
    websocket_streams_base: Arc<WebsocketStreamsBase>,
}

impl WebsocketStreams {
    pub(crate) async fn connect(
        config: ConfigurationWebsocketStreams,
        streams: Vec<String>,
        mode: Option<WebsocketMode>,
    ) -> anyhow::Result<Self> {
        let mut cfg = config;
        if let Some(m) = mode {
            cfg.mode = m;
        }

        let websocket_streams_base = WebsocketStreamsBase::new(cfg, vec![], vec![]);

        websocket_streams_base.clone().connect(streams).await?;

        Ok(Self {
            websocket_streams_base: websocket_streams_base.clone(),
        })
    }

    /// Subscribes to WebSocket events with a provided callback function.
    ///
    /// # Arguments
    ///
    /// * `callback` - A mutable function that takes a `WebsocketEvent` and is `Send` and `'static`.
    ///
    /// # Returns
    ///
    /// A `Subscription` that can be used to manage the event subscription.
    ///
    /// # Examples
    ///
    ///
    /// let subscription = `websocket_streams.subscribe_on_ws_events(|event`| {
    ///     // Handle WebSocket event
    /// });
    ///
    pub fn subscribe_on_ws_events<F>(&self, callback: F) -> Subscription
    where
        F: FnMut(WebsocketEvent) + Send + 'static,
    {
        let base = Arc::clone(&self.websocket_streams_base);
        base.common.events.subscribe(callback)
    }

    /// Unsubscribes from WebSocket events for a given `Subscription`.
    ///
    /// # Arguments
    ///
    /// * `subscription` - The `Subscription` to unsubscribe from WebSocket events.
    ///
    /// # Examples
    ///
    ///
    /// let subscription = `websocket_streams.subscribe_on_ws_events(|event`| {
    ///     // Handle WebSocket event
    /// });
    /// `websocket_streams.unsubscribe_from_ws_events(subscription)`;
    ///
    pub fn unsubscribe_from_ws_events(&self, subscription: Subscription) {
        subscription.unsubscribe();
    }

    /// Disconnects the WebSocket connection.
    ///
    /// # Returns
    ///
    /// A `Result` indicating whether the disconnection was successful.
    /// Returns an error if the disconnection fails.
    ///
    /// # Errors
    ///
    /// Returns an [`anyhow::Error`] if the connection fails.
    ///
    /// # Examples
    ///
    ///
    /// let `websocket_streams` = `WebSocketStreams::new`(...);
    /// `websocket_streams.disconnect().await`?;
    ///
    pub async fn disconnect(&self) -> anyhow::Result<()> {
        self.websocket_streams_base
            .disconnect()
            .await
            .map_err(anyhow::Error::msg)
    }

    /// Checks if the WebSocket connection is currently active.
    ///
    /// # Returns
    ///
    /// A `bool` indicating whether the WebSocket connection is established and connected.
    ///
    /// # Examples
    ///
    ///
    /// let `is_active` = `websocket_streams.is_connected().await`;
    /// if `is_active` {
    ///     // WebSocket connection is active
    /// }
    ///
    pub async fn is_connected(&self) -> bool {
        self.websocket_streams_base.is_connected().await
    }

    /// Sends a ping to the WebSocket server to maintain the connection.
    ///
    /// # Examples
    ///
    ///
    /// `websocket_streams.ping_server().await`;
    ///
    ///
    /// This method sends a ping request to the WebSocket server to keep the connection alive
    /// and check the server's responsiveness.
    pub async fn ping_server(&self) {
        self.websocket_streams_base.ping_server().await;
    }

    /// Subscribes to specified WebSocket streams.
    ///
    /// # Arguments
    ///
    /// * `streams` - A vector of stream names to subscribe to
    /// * `id` - An optional identifier for the subscription request
    ///
    /// # Examples
    ///
    ///
    /// `websocket_streams.subscribe(vec`!["`btcusdt@trade".to_string()`], None).await;
    ///
    ///
    /// This method initiates an asynchronous subscription to the specified WebSocket streams.
    /// The subscription is performed in a separate task using `spawn`.
    pub fn subscribe(&self, streams: Vec<String>, id: Option<String>) {
        let base = Arc::clone(&self.websocket_streams_base);
        spawn(async move { base.subscribe(streams, id.map(StreamId::from), None).await });
    }

    /// Unsubscribes from specified WebSocket streams.
    ///
    /// # Arguments
    ///
    /// * `streams` - A vector of stream names to unsubscribe from
    /// * `id` - An optional identifier for the unsubscription request
    ///
    /// # Examples
    ///
    ///
    /// `websocket_streams.unsubscribe(vec`!["`btcusdt@trade".to_string()`], None).await;
    ///
    ///
    /// This method initiates an asynchronous unsubscription from the specified WebSocket streams.
    /// The unsubscription is performed in a separate task using `spawn`.
    pub fn unsubscribe(&self, streams: Vec<String>, id: Option<String>) {
        let base = Arc::clone(&self.websocket_streams_base);
        spawn(async move {
            base.unsubscribe(streams, id.map(StreamId::from), None)
                .await;
        });
    }

    /// Checks if the current WebSocket stream is subscribed to a specific stream.
    ///
    /// # Arguments
    ///
    /// * `stream` - The name of the stream to check for subscription
    ///
    /// # Returns
    ///
    /// A boolean indicating whether the stream is currently subscribed
    ///
    /// # Examples
    ///
    ///
    /// let `is_subscribed` = `websocket_streams.is_subscribed("btcusdt@trade").await`;
    ///
    ///
    /// This method checks the subscription status of a specific WebSocket stream.
    pub async fn is_subscribed(&self, stream: &str) -> bool {
        self.websocket_streams_base.is_subscribed(stream).await
    }

    /// Risk Data Stream
    ///
    /// Establishes a WebSocket stream for risk-specific data events.
    ///
    /// # Arguments
    ///
    /// - `listen_key`: A unique key for identifying the risk data stream
    /// - `id`: An optional identifier for the stream request
    ///
    /// # Returns
    ///
    /// [`Arc<WebsocketStream<RiskDataStreamEventsResponse>>`] on success.
    ///
    /// # Errors
    ///
    /// Returns an [`anyhow::Error`] if the stream creation fails or if parsing the response encounters issues.
    ///
    /// # Examples
    ///
    ///
    /// let `risk_stream` = `websocket_streams.risk_data(listen_key`, None).await?;
    ///
    pub async fn risk_data(
        &self,
        listen_key: String,
        id: Option<String>,
    ) -> anyhow::Result<Arc<WebsocketStream<RiskDataStreamEventsResponse>>> {
        Ok(create_stream_handler::<RiskDataStreamEventsResponse>(
            WebsocketBase::WebsocketStreams(self.websocket_streams_base.clone()),
            listen_key,
            id.map(StreamId::from),
            None,
        )
        .await)
    }

    /// Trade Data Stream
    ///
    /// Establishes a WebSocket stream for trade-specific data events.
    ///
    /// # Arguments
    ///
    /// - `listen_key`: A unique key for identifying the trade data stream
    /// - `id`: An optional identifier for the stream request
    ///
    /// # Returns
    ///
    /// [`Arc<WebsocketStream<TradeDataStreamEventsResponse>>`] on success.
    ///
    /// # Errors
    ///
    /// Returns an [`anyhow::Error`] if the stream creation fails or if parsing the response encounters issues.
    ///
    /// # Examples
    ///
    ///
    /// let `trade_stream` = `websocket_streams.trade_data(listen_key`, None).await?;
    ///
    pub async fn trade_data(
        &self,
        listen_key: String,
        id: Option<String>,
    ) -> anyhow::Result<Arc<WebsocketStream<TradeDataStreamEventsResponse>>> {
        Ok(create_stream_handler::<TradeDataStreamEventsResponse>(
            WebsocketBase::WebsocketStreams(self.websocket_streams_base.clone()),
            listen_key,
            id.map(StreamId::from),
            None,
        )
        .await)
    }
}