bothan_binance/worker.rs
1//! Binance worker implementation.
2//!
3//! This module provides an implementation of the [`AssetWorker`] trait for interacting with
4//! the Binance WebSocket API. It defines the [`Worker`], which is responsible for subscribing
5//! to asset updates via WebSocket connections and storing the data into a shared [`WorkerStore`].
6//!
7//! The worker is configurable via [`WorkerOpts`] and uses [`WebSocketConnector`] to establish
8//! WebSocket connections to Binance endpoints.
9//!
10//! # The module provides:
11//!
12//! - Subscription to asset updates via WebSocket connections in asynchronous tasks
13//! - Ensures graceful cancellation by using a CancellationToken to signal shutdown and a DropGuard
14//! to automatically clean up resources when the worker is dropped
15//! - Metrics collection for observability
16//! - Configurable via endpoint URL and maximum subscriptions per connection
17//!
18//! # Examples
19//!
20//! ```rust
21//! use bothan_binance::worker::{Worker, WorkerOpts};
22//! use bothan_lib::worker::AssetWorker;
23//! use bothan_lib::store::Store;
24//!
25//! #[tokio::test]
26//! async fn test<T: Store>(store: T) {
27//! let opts = WorkerOpts::default();
28//! let ids = vec!["BTCUSDT".to_string(), "ETHUSDT".to_string()];
29//!
30//! let worker = Worker::build(opts, &store, ids).await?;
31//! }
32//! ```
33
34use std::sync::Arc;
35use std::time::Duration;
36
37use bothan_lib::metrics::websocket::Metrics;
38use bothan_lib::store::{Store, WorkerStore};
39use bothan_lib::worker::AssetWorker;
40use bothan_lib::worker::error::AssetWorkerError;
41use bothan_lib::worker::websocket::start_listening;
42use itertools;
43use itertools::Itertools;
44use tokio_util::sync::{CancellationToken, DropGuard};
45use tracing::{Instrument, Level, span};
46
47pub use crate::WorkerOpts;
48use crate::api::websocket::WebSocketConnector;
49
50pub mod opts;
51
52const WORKER_NAME: &str = "binance";
53const TIMEOUT: Duration = Duration::from_secs(720);
54const MAX_SUBSCRIPTION_PER_CONNECTION: usize = 200;
55
56/// Asset worker for subscribing to asset updates via the Binance WebSocket API.
57///
58/// The `Worker` manages asynchronous WebSocket connections for asset updates
59/// and ensures resources are properly cleaned up when dropped.
60pub struct Worker {
61 // We keep this DropGuard to ensure that all internal processes
62 // that the worker holds are dropped when the worker is dropped.
63 _drop_guard: DropGuard,
64}
65
66#[async_trait::async_trait]
67impl AssetWorker for Worker {
68 type Opts = WorkerOpts;
69
70 /// Returns the name identifier for the worker.
71 fn name(&self) -> &'static str {
72 WORKER_NAME
73 }
74
75 /// Builds and starts the `BinanceWorker`.
76 ///
77 /// This method creates a Binance WebSocket client, spawns asynchronous tasks
78 /// to subscribe to asset updates, and returns the running [`Worker`] instance.
79 async fn build<S: Store + 'static>(
80 opts: Self::Opts,
81 store: &S,
82 ids: Vec<String>,
83 ) -> Result<Self, AssetWorkerError> {
84 let url = opts.url;
85 let connector = Arc::new(WebSocketConnector::new(url));
86
87 let worker_store = WorkerStore::new(store, WORKER_NAME);
88 let token = CancellationToken::new();
89
90 for (i, set) in ids
91 .into_iter()
92 .chunks(opts.max_subscription_per_connection)
93 .into_iter()
94 .enumerate()
95 {
96 let span = span!(
97 Level::ERROR,
98 "source",
99 name = WORKER_NAME,
100 connection_idx = i
101 );
102 let worker = format!("{WORKER_NAME}_{i}");
103 let metrics = Metrics::new(WORKER_NAME, worker);
104 tokio::spawn(
105 start_listening(
106 token.child_token(),
107 connector.clone(),
108 worker_store.clone(),
109 set.collect(),
110 TIMEOUT,
111 metrics,
112 )
113 .instrument(span),
114 );
115 }
116
117 Ok(Worker {
118 _drop_guard: token.drop_guard(),
119 })
120 }
121}