Skip to main content

bothan_binance/
worker.rs

1//! Binance worker implementation.
2//!
3//! This module provides an implementation of the [`AssetWorker`] trait for interacting with
4//! the Binance WebSocket API. It defines the [`Worker`], which is responsible for subscribing
5//! to asset updates via WebSocket connections and storing the data into a shared [`WorkerStore`].
6//!
7//! The worker is configurable via [`WorkerOpts`] and uses [`WebSocketConnector`] to establish
8//! WebSocket connections to Binance endpoints.
9//!
10//! # The module provides:
11//!
12//! - Subscription to asset updates via WebSocket connections in asynchronous tasks
13//! - Ensures graceful cancellation by using a CancellationToken to signal shutdown and a DropGuard
14//!   to automatically clean up resources when the worker is dropped
15//! - Metrics collection for observability
16//! - Configurable via endpoint URL and maximum subscriptions per connection
17//!
18//! # Examples
19//!
20//! ```rust
21//! use bothan_binance::worker::{Worker, WorkerOpts};
22//! use bothan_lib::worker::AssetWorker;
23//! use bothan_lib::store::Store;
24//!
25//! #[tokio::test]
26//! async fn test<T: Store>(store: T) {
27//!     let opts = WorkerOpts::default();
28//!     let ids = vec!["BTCUSDT".to_string(), "ETHUSDT".to_string()];
29//!
30//!     let worker = Worker::build(opts, &store, ids).await?;
31//! }
32//! ```
33
34use std::sync::Arc;
35use std::time::Duration;
36
37use bothan_lib::metrics::websocket::Metrics;
38use bothan_lib::store::{Store, WorkerStore};
39use bothan_lib::worker::AssetWorker;
40use bothan_lib::worker::error::AssetWorkerError;
41use bothan_lib::worker::websocket::start_listening;
42use itertools;
43use itertools::Itertools;
44use tokio_util::sync::{CancellationToken, DropGuard};
45use tracing::{Instrument, Level, span};
46
47pub use crate::WorkerOpts;
48use crate::api::websocket::WebSocketConnector;
49
50pub mod opts;
51
52const WORKER_NAME: &str = "binance";
53const TIMEOUT: Duration = Duration::from_secs(720);
54const MAX_SUBSCRIPTION_PER_CONNECTION: usize = 200;
55
56/// Asset worker for subscribing to asset updates via the Binance WebSocket API.
57///
58/// The `Worker` manages asynchronous WebSocket connections for asset updates
59/// and ensures resources are properly cleaned up when dropped.
60pub struct Worker {
61    // We keep this DropGuard to ensure that all internal processes
62    // that the worker holds are dropped when the worker is dropped.
63    _drop_guard: DropGuard,
64}
65
66#[async_trait::async_trait]
67impl AssetWorker for Worker {
68    type Opts = WorkerOpts;
69
70    /// Returns the name identifier for the worker.
71    fn name(&self) -> &'static str {
72        WORKER_NAME
73    }
74
75    /// Builds and starts the `BinanceWorker`.
76    ///
77    /// This method creates a Binance WebSocket client, spawns asynchronous tasks
78    /// to subscribe to asset updates, and returns the running [`Worker`] instance.
79    async fn build<S: Store + 'static>(
80        opts: Self::Opts,
81        store: &S,
82        ids: Vec<String>,
83    ) -> Result<Self, AssetWorkerError> {
84        let url = opts.url;
85        let connector = Arc::new(WebSocketConnector::new(url));
86
87        let worker_store = WorkerStore::new(store, WORKER_NAME);
88        let token = CancellationToken::new();
89
90        for (i, set) in ids
91            .into_iter()
92            .chunks(opts.max_subscription_per_connection)
93            .into_iter()
94            .enumerate()
95        {
96            let span = span!(
97                Level::ERROR,
98                "source",
99                name = WORKER_NAME,
100                connection_idx = i
101            );
102            let worker = format!("{WORKER_NAME}_{i}");
103            let metrics = Metrics::new(WORKER_NAME, worker);
104            tokio::spawn(
105                start_listening(
106                    token.child_token(),
107                    connector.clone(),
108                    worker_store.clone(),
109                    set.collect(),
110                    TIMEOUT,
111                    metrics,
112                )
113                .instrument(span),
114            );
115        }
116
117        Ok(Worker {
118            _drop_guard: token.drop_guard(),
119        })
120    }
121}