bothan_okx/worker.rs
1//! OKX worker implementation.
2//!
3//! This module provides an implementation of the [`AssetWorker`] trait for interacting with
4//! the OKX WebSocket API. It defines the [`Worker`], which is responsible for subscribing
5//! to asset updates via WebSocket connections and storing the data into a shared [`WorkerStore`].
6//!
7//! The worker is configurable via [`WorkerOpts`] and uses [`WebSocketConnector`] to establish
8//! WebSocket connections to OKX endpoints.
9//!
10//! # The module provides:
11//!
12//! - Subscription to asset updates via WebSocket connections in asynchronous tasks
13//! - Ensures graceful cancellation by using a CancellationToken to signal shutdown and a DropGuard
14//! to automatically clean up resources when the worker is dropped
15//! - Metrics collection for observability
16//! - Configurable via endpoint URL
17
18use std::sync::Arc;
19use std::time::Duration;
20
21use bothan_lib::metrics::websocket::Metrics;
22use bothan_lib::store::{Store, WorkerStore};
23use bothan_lib::worker::AssetWorker;
24use bothan_lib::worker::error::AssetWorkerError;
25use bothan_lib::worker::websocket::start_listening;
26use tokio_util::sync::{CancellationToken, DropGuard};
27use tracing::{Instrument, Level, span};
28
29use crate::WorkerOpts;
30use crate::api::websocket::WebSocketConnector;
31
32pub mod opts;
33
34const WORKER_NAME: &str = "okx";
35const TIMEOUT: Duration = Duration::from_secs(60);
36
37/// Asset worker for subscribing to asset updates via the OKX WebSocket API.
38///
39/// The `Worker` manages asynchronous WebSocket connections for asset updates
40/// and ensures resources are properly cleaned up when dropped.
41pub struct Worker {
42 // We keep this DropGuard to ensure that all internal processes
43 // that the worker holds are dropped when the worker is dropped.
44 _drop_guard: DropGuard,
45}
46
47#[async_trait::async_trait]
48impl AssetWorker for Worker {
49 type Opts = WorkerOpts;
50
51 /// Returns the name identifier for the worker.
52 ///
53 /// This method provides a unique identifier for the OKX worker,
54 /// which is used for metrics collection and logging.
55 ///
56 /// # Returns
57 ///
58 /// A static string slice containing the worker name "okx".
59 fn name(&self) -> &'static str {
60 WORKER_NAME
61 }
62
63 /// Builds and starts the `OKXWorker`.
64 ///
65 /// This method creates an OKX WebSocket client, spawns an asynchronous task
66 /// to subscribe to asset updates, and returns the running [`Worker`] instance.
67 ///
68 /// # Parameters
69 ///
70 /// - `opts`: Configuration options for the worker, including URL
71 /// - `store`: The store instance for persisting asset data
72 /// - `ids`: A vector of asset identifiers to monitor
73 ///
74 /// # Returns
75 ///
76 /// Returns a `Result` containing a `Worker` instance on success,
77 /// or an `AssetWorkerError` if the worker cannot be built.
78 async fn build<S: Store + 'static>(
79 opts: Self::Opts,
80 store: &S,
81 ids: Vec<String>,
82 ) -> Result<Self, AssetWorkerError> {
83 let url = opts.url;
84 let connector = Arc::new(WebSocketConnector::new(url));
85 let worker_store = WorkerStore::new(store, WORKER_NAME);
86 let token = CancellationToken::new();
87 let metrics = Metrics::new(WORKER_NAME, WORKER_NAME.to_string());
88
89 let span = span!(Level::ERROR, "source", name = WORKER_NAME);
90
91 tokio::spawn(
92 start_listening(
93 token.child_token(),
94 connector,
95 worker_store,
96 ids,
97 TIMEOUT,
98 metrics,
99 )
100 .instrument(span),
101 );
102
103 Ok(Worker {
104 _drop_guard: token.drop_guard(),
105 })
106 }
107}