bothan_htx/worker.rs
1//! HTX worker implementation.
2//!
3//! This module provides an implementation of the [`AssetWorker`] trait for interacting with
4//! the HTX WebSocket API. It defines the [`Worker`], which is responsible for subscribing
5//! to asset updates via WebSocket connections and storing the data into a shared [`WorkerStore`].
6//!
7//! The worker is configurable via [`WorkerOpts`] and uses [`WebSocketConnector`] to establish
8//! WebSocket connections to HTX endpoints.
9//!
10//! # The module provides:
11//!
12//! - Subscription to asset updates via WebSocket connections in asynchronous tasks
13//! - Ensures graceful cancellation by using a CancellationToken to signal shutdown and a DropGuard
14//! to automatically clean up resources when the worker is dropped
15//! - Metrics collection for observability
16//! - Configurable via endpoint URL
17//!
18//! # Examples
19//!
20//! ```rust
21//! use bothan_htx::{Worker, WorkerOpts};
22//! use bothan_lib::worker::AssetWorker;
23//! use bothan_lib::store::Store;
24//!
25//! #[tokio::test]
26//! async fn test<T: Store>(store: T) {
27//! let opts = WorkerOpts::default();
28//! let ids = vec!["btcusdt".to_string(), "ethusdt".to_string()];
29//!
30//! let worker = Worker::build(opts, &store, ids).await?;
31//! }
32//! ```
33
34use std::sync::Arc;
35use std::time::Duration;
36
37use bothan_lib::metrics::websocket::Metrics;
38use bothan_lib::store::{Store, WorkerStore};
39use bothan_lib::worker::AssetWorker;
40use bothan_lib::worker::error::AssetWorkerError;
41use bothan_lib::worker::websocket::start_listening;
42use tokio_util::sync::{CancellationToken, DropGuard};
43use tracing::{Instrument, Level, span};
44
45use crate::WorkerOpts;
46use crate::api::WebSocketConnector;
47
48pub mod opts;
49
50const WORKER_NAME: &str = "htx";
51const TIMEOUT: Duration = Duration::from_secs(60);
52
53/// Asset worker for subscribing to asset updates via the HTX WebSocket API.
54///
55/// The `Worker` manages asynchronous WebSocket connections for asset updates
56/// and ensures resources are properly cleaned up when dropped.
57pub struct Worker {
58 // We keep this DropGuard to ensure that all internal processes
59 // that the worker holds are dropped when the worker is dropped.
60 _drop_guard: DropGuard,
61}
62
63#[async_trait::async_trait]
64impl AssetWorker for Worker {
65 type Opts = WorkerOpts;
66
67 /// Returns the name identifier for the worker.
68 ///
69 /// This method provides a unique identifier for the HTX worker,
70 /// which is used for metrics collection and logging.
71 ///
72 /// # Returns
73 ///
74 /// A static string slice containing the worker name "htx".
75 fn name(&self) -> &'static str {
76 WORKER_NAME
77 }
78
79 /// Builds and starts the `HTXWorker`.
80 ///
81 /// This method creates an HTX WebSocket client, spawns an asynchronous task
82 /// to subscribe to asset updates, and returns the running [`Worker`] instance.
83 ///
84 /// # Parameters
85 ///
86 /// - `opts`: Configuration options for the worker, including URL
87 /// - `store`: The store instance for persisting asset data
88 /// - `ids`: A vector of asset identifiers to monitor
89 ///
90 /// # Returns
91 ///
92 /// Returns a `Result` containing a `Worker` instance on success,
93 /// or an `AssetWorkerError` if the worker cannot be built.
94 async fn build<S: Store + 'static>(
95 opts: Self::Opts,
96 store: &S,
97 ids: Vec<String>,
98 ) -> Result<Self, AssetWorkerError> {
99 let url = opts.url;
100 let connector = Arc::new(WebSocketConnector::new(url));
101
102 let worker_store = WorkerStore::new(store, WORKER_NAME);
103
104 let token = CancellationToken::new();
105
106 let metrics = Metrics::new(WORKER_NAME, WORKER_NAME.to_string());
107
108 let span = span!(Level::ERROR, "source", name = WORKER_NAME);
109 tokio::spawn(
110 start_listening(
111 token.child_token(),
112 connector.clone(),
113 worker_store.clone(),
114 ids,
115 TIMEOUT,
116 metrics,
117 )
118 .instrument(span),
119 );
120
121 Ok(Worker {
122 _drop_guard: token.drop_guard(),
123 })
124 }
125}