bothan_lib/worker/rest.rs
1//! REST-based asset information providers and polling mechanisms.
2//!
3//! This module provides functionality for polling asset information from REST APIs.
4//! It defines a trait for asset information providers and a function for starting
5//! a polling loop that periodically fetches and stores asset information.
6//!
7//! The module provides:
8//!
9//! - The [`AssetInfoProvider`] trait which defines the interface for REST-based asset info providers
10//! - The [`start_polling`] function which implements the polling loop mechanism
11//!
12//! # Polling Strategy
13//!
14//! The polling strategy follows these principles:
15//!
16//! 1. **Regular Intervals**: Asset information is polled at regular intervals
17//! 2. **Timeout Protection**: Requests that take too long are cancelled to prevent blocking
18//! 3. **Error Handling**: Errors during polling are logged but don't stop the polling process
19//! 4. **Graceful Cancellation**: Polling can be gracefully stopped using a cancellation token
20//!
21//! When implementing new REST-based asset providers, implement the [`AssetInfoProvider`] trait
22//! and use the [`start_polling`] function to handle the polling lifecycle.
23
24use std::fmt::Display;
25use std::time::{Duration, Instant};
26
27use tokio::select;
28use tokio::time::{interval, timeout};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, info};
31
32use crate::metrics::rest::{Metrics, PollingResult};
33use crate::store::{Store, WorkerStore};
34use crate::types::AssetInfo;
35
36/// Trait for providers that can fetch asset information from REST APIs.
37///
38/// This trait defines the interface that REST-based asset information providers
39/// must implement. Providers are responsible for making HTTP requests to fetch
40/// asset data and converting the responses into [`AssetInfo`] structures.
41#[async_trait::async_trait]
42pub trait AssetInfoProvider: Send + Sync {
43 /// The type returned in the event of an operation failure.
44 ///
45 /// This should be a custom error type that implements the Display trait
46 /// and captures all possible error conditions specific to the API.
47 type Error: Display;
48
49 /// Fetches asset information for the specified asset IDs.
50 ///
51 /// This method should make HTTP requests to the appropriate endpoints,
52 /// parse the responses, and convert the data into a vector of [`AssetInfo`] structures.
53 ///
54 /// # Errors
55 ///
56 /// Returns a provider-specific error if the operation fails, such as when
57 /// the API is unavailable, returns an error response, or the response cannot
58 /// be parsed correctly.
59 async fn get_asset_info(&self, ids: &[String]) -> Result<Vec<AssetInfo>, Self::Error>;
60}
61
62/// Starts polling asset information from a provider at the specified update interval.
63///
64/// This function implements a polling loop that periodically fetches asset information
65/// from the provider and stores it using the provided worker store. The loop continues
66/// until the cancellation token is triggered.
67///
68/// # Features
69///
70/// * Polls at regular intervals specified by `update_interval`
71/// * Times out requests that take too long (based on the update interval)
72/// * Handles errors gracefully by logging them and continuing
73/// * Cancels polling gracefully when requested via the cancellation token
74#[tracing::instrument(skip(cancellation_token, provider, store, ids))]
75pub async fn start_polling<S: Store, E: Display, P: AssetInfoProvider<Error = E>>(
76 cancellation_token: CancellationToken,
77 update_interval: Duration,
78 provider: P,
79 store: WorkerStore<S>,
80 ids: Vec<String>,
81 metrics: Metrics,
82) {
83 if ids.is_empty() {
84 debug!("no ids to poll");
85 return;
86 }
87 let mut interval = interval(update_interval);
88
89 loop {
90 select! {
91 _ = cancellation_token.cancelled() => {
92 info!("polling: cancelled");
93 break
94 },
95 _ = interval.tick() => {
96 info!("polling");
97 let start_time = Instant::now();
98
99 let polling_result = match timeout(interval.period(), provider.get_asset_info(&ids)).await {
100 Ok(Ok(asset_info)) => {
101 if let Err(e) = store.set_batch_asset_info(asset_info).await {
102 error!("failed to store asset info with error: {e}");
103 } else {
104 info!("asset info updated successfully");
105 }
106 PollingResult::Success
107 }
108 Ok(Err(e)) => {
109 error!("failed to poll asset info with error: {e}");
110 PollingResult::Failed
111 }
112 Err(_) => {
113 error!("updating interval exceeded timeout");
114 PollingResult::Timeout
115 }
116 };
117
118 metrics.update_rest_polling(start_time.elapsed().as_millis(), polling_result);
119 },
120 }
121 }
122}