use std::fmt::Display;
use std::time::{Duration, Instant};
use tokio::select;
use tokio::time::{interval, timeout};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
use crate::metrics::rest::{Metrics, PollingResult};
use crate::store::{Store, WorkerStore};
use crate::types::AssetInfo;
#[async_trait::async_trait]
pub trait AssetInfoProvider: Send + Sync {
type Error: Display;
async fn get_asset_info(&self, ids: &[String]) -> Result<Vec<AssetInfo>, Self::Error>;
}
#[tracing::instrument(skip(cancellation_token, provider, store, ids))]
pub async fn start_polling<S: Store, E: Display, P: AssetInfoProvider<Error = E>>(
cancellation_token: CancellationToken,
update_interval: Duration,
provider: P,
store: WorkerStore<S>,
ids: Vec<String>,
metrics: Metrics,
) {
if ids.is_empty() {
debug!("no ids to poll");
return;
}
let mut interval = interval(update_interval);
loop {
select! {
_ = cancellation_token.cancelled() => {
info!("polling: cancelled");
break
},
_ = interval.tick() => {
info!("polling");
let start_time = Instant::now();
let polling_result = match timeout(interval.period(), provider.get_asset_info(&ids)).await {
Ok(Ok(asset_info)) => {
if let Err(e) = store.set_batch_asset_info(asset_info).await {
error!("failed to store asset info with error: {e}");
} else {
info!("asset info updated successfully");
}
PollingResult::Success
}
Ok(Err(e)) => {
error!("failed to poll asset info with error: {e}");
PollingResult::Failed
}
Err(_) => {
error!("updating interval exceeded timeout");
PollingResult::Timeout
}
};
metrics.update_rest_polling(start_time.elapsed().as_millis(), polling_result);
},
}
}
}