bothan_bitfinex/worker.rs
1//! Bitfinex worker implementation.
2//!
3//! This module provides an implementation of the [`AssetWorker`] trait for interacting with
4//! the Bitfinex REST API. It defines the [`Worker`], which is responsible for polling
5//! asset updates via REST API calls and storing the data into a shared [`WorkerStore`].
6//!
7//! The worker is configurable via [`WorkerOpts`] and uses [`RestApiBuilder`] to create
8//! REST API clients for Bitfinex endpoints.
9//!
10//! # The module provides:
11//!
12//! - Periodic polling of asset updates via REST API calls in asynchronous tasks
13//! - Ensures graceful cancellation by using a CancellationToken to signal shutdown and a DropGuard
14//! to automatically clean up resources when the worker is dropped
15//! - Metrics collection for observability
16//! - Configurable via endpoint URL and update interval
17//!
18//! # Examples
19//!
20//! ```rust
21//! use bothan_bitfinex::worker::Worker;
22//! use bothan_bitfinex::worker::opts::WorkerOpts;
23//! use bothan_lib::worker::AssetWorker;
24//! use bothan_lib::store::Store;
25//!
26//! #[tokio::test]
27//! async fn test<T: Store>(store: T) {
28//! let opts = WorkerOpts::default();
29//! let ids = vec!["tBTCUSD".to_string(), "tETHUSD".to_string()];
30//!
31//! let worker = Worker::build(opts, &store, ids).await?;
32//! }
33//! ```
34
35use bothan_lib::metrics::rest::Metrics;
36use bothan_lib::store::{Store, WorkerStore};
37use bothan_lib::worker::AssetWorker;
38use bothan_lib::worker::error::AssetWorkerError;
39use bothan_lib::worker::rest::start_polling;
40use tokio_util::sync::{CancellationToken, DropGuard};
41use tracing::{Instrument, Level, span};
42
43use crate::api::builder::RestApiBuilder;
44use crate::worker::opts::WorkerOpts;
45
46const WORKER_NAME: &str = "bitfinex";
47
48pub mod opts;
49
50/// Asset worker for polling asset updates via the Bitfinex REST API.
51///
52/// The `Worker` manages asynchronous REST API polling for asset updates
53/// and ensures resources are properly cleaned up when dropped.
54pub struct Worker {
55 // We keep this DropGuard to ensure that all internal processes
56 // that the worker holds are dropped when the worker is dropped.
57 _drop_guard: DropGuard,
58}
59
60#[async_trait::async_trait]
61impl AssetWorker for Worker {
62 type Opts = WorkerOpts;
63
64 /// Returns the name identifier for the worker.
65 ///
66 /// This method provides a unique identifier for the Bitfinex worker,
67 /// which is used for metrics collection and logging.
68 ///
69 /// # Returns
70 ///
71 /// A static string slice containing the worker name "bitfinex".
72 fn name(&self) -> &'static str {
73 WORKER_NAME
74 }
75
76 /// Builds and starts the `BitfinexWorker`.
77 ///
78 /// This method creates a Bitfinex REST API client, spawns an asynchronous task
79 /// to poll for asset updates at the configured interval, and returns the running [`Worker`] instance.
80 ///
81 /// # Parameters
82 ///
83 /// - `opts`: Configuration options for the worker, including URL and update interval
84 /// - `store`: The store instance for persisting asset data
85 /// - `ids`: A vector of asset identifiers to monitor
86 ///
87 /// # Returns
88 ///
89 /// Returns a `Result` containing a `Worker` instance on success,
90 /// or an `AssetWorkerError` if the worker cannot be built.
91 async fn build<S: Store + 'static>(
92 opts: Self::Opts,
93 store: &S,
94 ids: Vec<String>,
95 ) -> Result<Self, AssetWorkerError> {
96 let api = RestApiBuilder::new(&opts.url).build()?;
97 let worker_store = WorkerStore::new(store, WORKER_NAME);
98
99 let token = CancellationToken::new();
100
101 let metrics = Metrics::new(WORKER_NAME);
102
103 let span = span!(Level::ERROR, "source", name = WORKER_NAME);
104 tokio::spawn(
105 start_polling(
106 token.child_token(),
107 opts.update_interval,
108 api,
109 worker_store,
110 ids,
111 metrics,
112 )
113 .instrument(span),
114 );
115
116 Ok(Worker {
117 _drop_guard: token.drop_guard(),
118 })
119 }
120}