price_adapter/services/
interval.rs

1use crate::types::{Service, Source};
2use crate::{error::Error, types::PriceInfo};
3use std::time::Duration;
4use std::{collections::HashMap, sync::Arc};
5use tokio::time::sleep;
6use tokio::{select, sync::Mutex};
7use tokio_util::sync::CancellationToken;
8
9/// A caching object storing prices received from Source at regular intervals.
10pub struct IntervalService<S: Source> {
11    adapter: Arc<Mutex<S>>,
12    interval: Duration,
13    cached_prices: Arc<Mutex<HashMap<String, PriceInfo>>>,
14    cancellation_token: Option<CancellationToken>,
15}
16
17impl<S: Source> IntervalService<S> {
18    /// Creates a new `IntervalService` with the provided Source.
19    pub fn new(adapter: S, interval: Duration) -> Self {
20        Self {
21            adapter: Arc::new(Mutex::new(adapter)),
22            interval,
23            cached_prices: Arc::new(Mutex::new(HashMap::new())),
24            cancellation_token: None,
25        }
26    }
27}
28
29#[async_trait::async_trait]
30impl<S: Source> Service for IntervalService<S> {
31    /// Starts the service, fetching prices at regular intervals and caching them.
32    async fn start(&mut self, symbols: &[&str]) -> Result<(), Error> {
33        if self.is_started().await {
34            return Err(Error::AlreadyStarted);
35        }
36
37        let token = CancellationToken::new();
38        let cloned_token = token.clone();
39        let cloned_adapter = Arc::clone(&self.adapter);
40        let cloned_symbols: Vec<String> = symbols.iter().map(|&s| s.to_string()).collect();
41        let cloned_cached_prices = Arc::clone(&self.cached_prices);
42        let interval_duration = self.interval;
43        self.cancellation_token = Some(token);
44
45        tokio::spawn(async move {
46            loop {
47                let borrowed_symbols: Vec<&str> =
48                    cloned_symbols.iter().map(|s| s.as_str()).collect();
49                let locked_adapter = cloned_adapter.lock().await;
50
51                select! {
52                    _ = cloned_token.cancelled() => {
53                        break;
54                    }
55
56                    prices = locked_adapter.get_prices(&borrowed_symbols) => {
57                        drop(locked_adapter);
58
59                        let mut locked_cached_prices = cloned_cached_prices.lock().await;
60                        for price in prices.into_iter().flatten() {
61                            locked_cached_prices.insert(price.symbol.to_string(), price);
62                        }
63                    }
64                }
65
66                sleep(interval_duration).await;
67            }
68        });
69
70        Ok(())
71    }
72
73    /// Stops the service, cancelling the interval fetching.
74    async fn stop(&mut self) {
75        if let Some(token) = &self.cancellation_token {
76            token.cancel();
77        }
78        self.cancellation_token = None;
79    }
80
81    // To check if the service is started.
82    async fn is_started(&self) -> bool {
83        self.cancellation_token.is_some()
84    }
85}
86
87#[async_trait::async_trait]
88impl<S: Source> Source for IntervalService<S> {
89    /// Retrieves prices for the specified symbols from the cached prices.
90    async fn get_prices(&self, symbols: &[&str]) -> Vec<Result<PriceInfo, Error>> {
91        let locked_cached_prices = self.cached_prices.lock().await;
92        symbols
93            .iter()
94            .map(|&symbol| {
95                locked_cached_prices
96                    .get(&symbol.to_ascii_uppercase())
97                    .map_or_else(
98                        || Err(Error::NotFound(symbol.to_string())),
99                        |price| Ok(price.clone()),
100                    )
101            })
102            .collect()
103    }
104
105    // Asynchronous function to get price for a symbol.
106    async fn get_price(&self, symbol: &str) -> Result<PriceInfo, Error> {
107        self.get_prices(&[symbol])
108            .await
109            .pop()
110            .ok_or(Error::Unknown)?
111    }
112}