price_adapter/services/
interval.rs1use crate::types::{Service, Source};
2use crate::{error::Error, types::PriceInfo};
3use std::time::Duration;
4use std::{collections::HashMap, sync::Arc};
5use tokio::time::sleep;
6use tokio::{select, sync::Mutex};
7use tokio_util::sync::CancellationToken;
8
9pub struct IntervalService<S: Source> {
11 adapter: Arc<Mutex<S>>,
12 interval: Duration,
13 cached_prices: Arc<Mutex<HashMap<String, PriceInfo>>>,
14 cancellation_token: Option<CancellationToken>,
15}
16
17impl<S: Source> IntervalService<S> {
18 pub fn new(adapter: S, interval: Duration) -> Self {
20 Self {
21 adapter: Arc::new(Mutex::new(adapter)),
22 interval,
23 cached_prices: Arc::new(Mutex::new(HashMap::new())),
24 cancellation_token: None,
25 }
26 }
27}
28
29#[async_trait::async_trait]
30impl<S: Source> Service for IntervalService<S> {
31 async fn start(&mut self, symbols: &[&str]) -> Result<(), Error> {
33 if self.is_started().await {
34 return Err(Error::AlreadyStarted);
35 }
36
37 let token = CancellationToken::new();
38 let cloned_token = token.clone();
39 let cloned_adapter = Arc::clone(&self.adapter);
40 let cloned_symbols: Vec<String> = symbols.iter().map(|&s| s.to_string()).collect();
41 let cloned_cached_prices = Arc::clone(&self.cached_prices);
42 let interval_duration = self.interval;
43 self.cancellation_token = Some(token);
44
45 tokio::spawn(async move {
46 loop {
47 let borrowed_symbols: Vec<&str> =
48 cloned_symbols.iter().map(|s| s.as_str()).collect();
49 let locked_adapter = cloned_adapter.lock().await;
50
51 select! {
52 _ = cloned_token.cancelled() => {
53 break;
54 }
55
56 prices = locked_adapter.get_prices(&borrowed_symbols) => {
57 drop(locked_adapter);
58
59 let mut locked_cached_prices = cloned_cached_prices.lock().await;
60 for price in prices.into_iter().flatten() {
61 locked_cached_prices.insert(price.symbol.to_string(), price);
62 }
63 }
64 }
65
66 sleep(interval_duration).await;
67 }
68 });
69
70 Ok(())
71 }
72
73 async fn stop(&mut self) {
75 if let Some(token) = &self.cancellation_token {
76 token.cancel();
77 }
78 self.cancellation_token = None;
79 }
80
81 async fn is_started(&self) -> bool {
83 self.cancellation_token.is_some()
84 }
85}
86
87#[async_trait::async_trait]
88impl<S: Source> Source for IntervalService<S> {
89 async fn get_prices(&self, symbols: &[&str]) -> Vec<Result<PriceInfo, Error>> {
91 let locked_cached_prices = self.cached_prices.lock().await;
92 symbols
93 .iter()
94 .map(|&symbol| {
95 locked_cached_prices
96 .get(&symbol.to_ascii_uppercase())
97 .map_or_else(
98 || Err(Error::NotFound(symbol.to_string())),
99 |price| Ok(price.clone()),
100 )
101 })
102 .collect()
103 }
104
105 async fn get_price(&self, symbol: &str) -> Result<PriceInfo, Error> {
107 self.get_prices(&[symbol])
108 .await
109 .pop()
110 .ok_or(Error::Unknown)?
111 }
112}