pyth_lazer_client/
history_client.rs

1use std::{
2    collections::HashMap,
3    io::Write,
4    path::{Path, PathBuf},
5    sync::{Arc, Weak},
6    time::Duration,
7};
8
9use anyhow::{bail, Context as _};
10use arc_swap::ArcSwap;
11use atomicwrites::replace_atomic;
12use backoff::{exponential::ExponentialBackoff, future::retry_notify, SystemClock};
13use futures::{stream::FuturesUnordered, StreamExt};
14use pyth_lazer_protocol::{jrpc::SymbolMetadata, PriceFeedId};
15use serde::{de::DeserializeOwned, Deserialize, Serialize};
16use tokio::{sync::mpsc, time::sleep};
17use tracing::{info, warn};
18use url::Url;
19
20/// Configuration for the history client.
21#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
22pub struct PythLazerHistoryClientConfig {
23    /// URLs of the history services.
24    #[serde(default = "default_urls")]
25    pub urls: Vec<Url>,
26    /// Interval of queries to the history services.
27    /// Note: if the request fails, it will be retried using exponential backoff regardless of this setting.
28    #[serde(with = "humantime_serde", default = "default_update_interval")]
29    pub update_interval: Duration,
30    /// Timeout of an individual request.
31    #[serde(with = "humantime_serde", default = "default_request_timeout")]
32    pub request_timeout: Duration,
33    /// Path to the cache directory that can be used to provide latest data if history service is unavailable.
34    pub cache_dir: Option<PathBuf>,
35    /// Capacity of communication channels created by this client. It must be above zero.
36    #[serde(default = "default_channel_capacity")]
37    pub channel_capacity: usize,
38}
39
40fn default_urls() -> Vec<Url> {
41    vec![Url::parse("https://history.pyth-lazer.dourolabs.app/").unwrap()]
42}
43
44fn default_update_interval() -> Duration {
45    Duration::from_secs(30)
46}
47
48fn default_request_timeout() -> Duration {
49    Duration::from_secs(15)
50}
51
52fn default_channel_capacity() -> usize {
53    1000
54}
55
56impl Default for PythLazerHistoryClientConfig {
57    fn default() -> Self {
58        Self {
59            urls: default_urls(),
60            update_interval: default_update_interval(),
61            request_timeout: default_request_timeout(),
62            cache_dir: None,
63            channel_capacity: default_channel_capacity(),
64        }
65    }
66}
67
68/// Client to the history service API.
69#[derive(Debug, Clone)]
70pub struct PythLazerHistoryClient {
71    config: Arc<PythLazerHistoryClientConfig>,
72    client: reqwest::Client,
73}
74
75impl PythLazerHistoryClient {
76    pub fn new(config: PythLazerHistoryClientConfig) -> Self {
77        Self {
78            client: reqwest::Client::builder()
79                .timeout(config.request_timeout)
80                .build()
81                .expect("failed to initialize reqwest"),
82            config: Arc::new(config),
83        }
84    }
85
86    fn symbols_cache_file_path(&self) -> Option<PathBuf> {
87        self.config
88            .cache_dir
89            .as_ref()
90            .map(|path| path.join("symbols_v1.json"))
91    }
92
93    /// Fetch current metadata for all symbols.
94    pub async fn all_symbols_metadata(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
95        self.fetch_symbols_initial().await
96    }
97
98    /// Fetch metadata for all symbols as an auto-updating handle.
99    ///
100    /// Returns an error if the initial fetch failed.
101    /// The returned `SymbolMetadataHandle` will be updated by a background task when the data changes.
102    pub async fn all_symbols_metadata_handle(&self) -> anyhow::Result<SymbolMetadataHandle> {
103        let symbols = Arc::new(
104            self.fetch_symbols_initial()
105                .await?
106                .into_iter()
107                .map(|f| (f.pyth_lazer_id, f))
108                .collect::<HashMap<_, _>>(),
109        );
110        let previous_symbols = symbols.clone();
111        let handle = Arc::new(ArcSwap::new(symbols));
112        let client = self.clone();
113        let weak_handle = Arc::downgrade(&handle);
114        tokio::spawn(async move {
115            client
116                .update_symbols_handle(weak_handle, previous_symbols)
117                .await;
118        });
119        Ok(SymbolMetadataHandle(handle))
120    }
121
122    /// Fetch metadata for all symbols as an auto-updating handle.
123    ///
124    /// The returned `SymbolMetadataHandle` will be updated by a background task when the data changes.
125    /// If the initial fetch failed, the handle will initially contain an empty hashmap.
126    pub async fn all_symbols_metadata_fault_tolerant_handle(&self) -> SymbolMetadataHandle {
127        let initial_result = self.fetch_symbols_initial().await;
128        let symbols = match initial_result {
129            Ok(data) => data
130                .into_iter()
131                .map(|f| (f.pyth_lazer_id, f))
132                .collect::<HashMap<_, _>>(),
133            Err(err) => {
134                warn!(
135                    ?err,
136                    "failed to fetch symbols, proceeding with empty symbol list"
137                );
138                HashMap::new()
139            }
140        };
141        let symbols = Arc::new(symbols);
142        let previous_symbols = symbols.clone();
143        let handle = Arc::new(ArcSwap::new(symbols));
144        let weak_handle = Arc::downgrade(&handle);
145        let client = self.clone();
146        tokio::spawn(async move {
147            client
148                .update_symbols_handle(weak_handle, previous_symbols)
149                .await;
150        });
151        SymbolMetadataHandle(handle)
152    }
153
154    /// Fetch metadata for all symbols as a receiver.
155    ///
156    /// Returns an error if the initial fetch failed.
157    /// On a successful return, the channel will always contain the initial data that can be fetched
158    /// immediately from the returned receiver.
159    /// You should continuously poll the receiver to receive updates.
160    pub async fn all_symbols_metadata_stream(
161        &self,
162    ) -> anyhow::Result<mpsc::Receiver<Vec<SymbolMetadata>>> {
163        if self.config.channel_capacity == 0 {
164            bail!("channel_capacity cannot be 0");
165        }
166        let symbols = self.fetch_symbols_initial().await?;
167        let (sender, receiver) = mpsc::channel(self.config.channel_capacity);
168
169        let previous_symbols = symbols.clone();
170        sender
171            .send(symbols)
172            .await
173            .expect("send to new channel failed");
174        let client = self.clone();
175        tokio::spawn(async move {
176            client.update_symbols_stream(sender, previous_symbols).await;
177        });
178        Ok(receiver)
179    }
180
181    async fn update_symbols_handle(
182        &self,
183        handle: Weak<ArcSwap<HashMap<PriceFeedId, SymbolMetadata>>>,
184        mut previous_symbols: Arc<HashMap<PriceFeedId, SymbolMetadata>>,
185    ) {
186        info!("starting background task for updating symbols");
187        loop {
188            sleep(self.config.update_interval).await;
189            if handle.upgrade().is_none() {
190                info!("symbols handle dropped, stopping background task");
191                return;
192            }
193            match self.fetch_symbols().await {
194                Ok(new_symbols) => {
195                    let new_symbols = new_symbols
196                        .into_iter()
197                        .map(|f| (f.pyth_lazer_id, f))
198                        .collect::<HashMap<_, _>>();
199                    if *previous_symbols != new_symbols {
200                        let Some(handle) = handle.upgrade() else {
201                            info!("symbols handle dropped, stopping background task");
202                            return;
203                        };
204                        info!("symbols changed");
205                        if let Some(cache_file_path) = self.symbols_cache_file_path() {
206                            if let Err(err) = atomic_save_file(&cache_file_path, &new_symbols) {
207                                warn!(?err, ?cache_file_path, "failed to save data to cache file");
208                            }
209                        }
210                        let new_symbols = Arc::new(new_symbols);
211                        previous_symbols = new_symbols.clone();
212                        handle.store(new_symbols);
213                    }
214                }
215                Err(err) => {
216                    warn!(?err, "failed to fetch symbols");
217                }
218            }
219        }
220    }
221
222    async fn update_symbols_stream(
223        &self,
224        handle: mpsc::Sender<Vec<SymbolMetadata>>,
225        mut previous_symbols: Vec<SymbolMetadata>,
226    ) {
227        info!("starting background task for updating symbols");
228        loop {
229            sleep(self.config.update_interval).await;
230            if handle.is_closed() {
231                info!("symbols channel closed, stopping background task");
232                return;
233            }
234            match self.fetch_symbols().await {
235                Ok(new_symbols) => {
236                    if *previous_symbols != new_symbols {
237                        info!("symbols changed");
238                        if let Some(cache_file_path) = self.symbols_cache_file_path() {
239                            if let Err(err) = atomic_save_file(&cache_file_path, &new_symbols) {
240                                warn!(?err, ?cache_file_path, "failed to save data to cache file");
241                            }
242                        }
243                        previous_symbols = new_symbols.clone();
244                        if handle.send(new_symbols).await.is_err() {
245                            info!("symbols channel closed, stopping background task");
246                            return;
247                        }
248                    }
249                }
250                Err(err) => {
251                    warn!(?err, "failed to fetch symbols");
252                }
253            }
254        }
255    }
256
257    async fn fetch_symbols_initial(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
258        let result = self.fetch_symbols().await;
259        match result {
260            Ok(data) => {
261                info!("fetched initial symbols from history service");
262                if let Some(cache_file_path) = self.symbols_cache_file_path() {
263                    if let Err(err) = atomic_save_file(&cache_file_path, &data) {
264                        warn!(?err, ?cache_file_path, "failed to save data to cache file");
265                    }
266                }
267                Ok(data)
268            }
269            Err(err) => match self.symbols_cache_file_path() {
270                Some(cache_file_path) => match load_file::<Vec<SymbolMetadata>>(&cache_file_path) {
271                    Ok(Some(data)) => {
272                        info!(?err, "failed to fetch initial symbols from history service, but fetched last known symbols from cache");
273                        Ok(data)
274                    }
275                    Ok(None) => Err(err),
276                    Err(cache_err) => {
277                        warn!(?cache_err, "failed to fetch data from cache");
278                        Err(err)
279                    }
280                },
281                None => Err(err),
282            },
283        }
284    }
285
286    async fn fetch_symbols(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
287        if self.config.urls.is_empty() {
288            bail!("no history urls provided");
289        }
290        let mut futures = self
291            .config
292            .urls
293            .iter()
294            .map(|url| Box::pin(self.fetch_symbols_single(url)))
295            .collect::<FuturesUnordered<_>>();
296        while let Some(result) = futures.next().await {
297            match result {
298                Ok(output) => return Ok(output),
299                Err(err) => {
300                    warn!("failed to fetch symbols: {:?}", err);
301                }
302            }
303        }
304
305        bail!(
306            "failed to fetch symbols from any urls ({:?})",
307            self.config.urls
308        );
309    }
310
311    async fn fetch_symbols_single(&self, url: &Url) -> anyhow::Result<Vec<SymbolMetadata>> {
312        let url = url.join("v1/symbols")?;
313        retry_notify(
314            ExponentialBackoff::<SystemClock> {
315                // We will retry all requests after `update_interval`, so there is
316                // no reason to continue retrying here.
317                max_elapsed_time: Some(self.config.update_interval),
318                ..Default::default()
319            },
320            || async {
321                let response = self
322                    .client
323                    .get(url.clone())
324                    .send()
325                    .await
326                    .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))?
327                    .backoff_error_for_status()?;
328                response
329                    .json::<Vec<SymbolMetadata>>()
330                    .await
331                    .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))
332            },
333            |e, _| warn!("failed to fetch symbols from {} (will retry): {:?}", url, e),
334        )
335        .await
336    }
337}
338
339#[derive(Debug, Clone)]
340pub struct SymbolMetadataHandle(Arc<ArcSwap<HashMap<PriceFeedId, SymbolMetadata>>>);
341
342impl SymbolMetadataHandle {
343    pub fn symbols(&self) -> arc_swap::Guard<Arc<HashMap<PriceFeedId, SymbolMetadata>>> {
344        self.0.load()
345    }
346
347    pub fn new_for_test(data: HashMap<PriceFeedId, SymbolMetadata>) -> Self {
348        Self(Arc::new(ArcSwap::new(Arc::new(data))))
349    }
350}
351
352trait BackoffErrorForStatusExt: Sized {
353    fn backoff_error_for_status(self) -> Result<Self, backoff::Error<anyhow::Error>>;
354}
355
356impl BackoffErrorForStatusExt for reqwest::Response {
357    fn backoff_error_for_status(self) -> Result<Self, backoff::Error<anyhow::Error>> {
358        let status = self.status();
359        self.error_for_status().map_err(|err| {
360            if status.is_server_error() {
361                backoff::Error::transient(err.into())
362            } else {
363                backoff::Error::permanent(err.into())
364            }
365        })
366    }
367}
368
369fn load_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<Option<T>> {
370    let parent_path = path.parent().context("invalid file path: no parent")?;
371    fs_err::create_dir_all(parent_path)?;
372
373    if !path.try_exists()? {
374        return Ok(None);
375    }
376    let json_data = fs_err::read_to_string(path)?;
377    let data = serde_json::from_str::<T>(&json_data)?;
378    Ok(Some(data))
379}
380
381fn atomic_save_file(path: &Path, data: &impl Serialize) -> anyhow::Result<()> {
382    let parent_path = path.parent().context("invalid file path: no parent")?;
383    fs_err::create_dir_all(parent_path)?;
384
385    let json_data = serde_json::to_string(&data)?;
386    let tmp_path = path.with_extension("tmp");
387    let mut tmp_file = fs_err::File::create(&tmp_path)?;
388    tmp_file.write_all(json_data.as_bytes())?;
389    tmp_file.flush()?;
390    tmp_file.sync_all()?;
391    replace_atomic(&tmp_path, path)?;
392
393    Ok(())
394}