pyth_lazer_client/
history_client.rs

1use std::{
2    collections::HashMap,
3    io::Write,
4    path::{Path, PathBuf},
5    sync::{Arc, Weak},
6    time::Duration,
7};
8
9use anyhow::{bail, Context as _};
10use arc_swap::ArcSwap;
11use atomicwrites::replace_atomic;
12use backoff::{exponential::ExponentialBackoff, future::retry_notify, SystemClock};
13use futures::{stream::FuturesUnordered, StreamExt};
14use pyth_lazer_protocol::{jrpc::SymbolMetadata, PriceFeedId};
15use serde::{de::DeserializeOwned, Deserialize, Serialize};
16use tokio::{sync::mpsc, time::sleep};
17use tracing::{info, warn};
18use url::Url;
19
20const DEFAULT_URLS: &[&str] = &["https://history.pyth-lazer.dourolabs.app/"];
21const DEFAULT_UPDATE_INTERVAL: Duration = Duration::from_secs(30);
22const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
23
24/// Configuration for the history client.
25#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
26pub struct PythLazerHistoryClientConfig {
27    /// URLs of the history services.
28    #[serde(default = "default_urls")]
29    pub urls: Vec<Url>,
30    /// Interval of queries to the history services.
31    /// Note: if the request fails, it will be retried using exponential backoff regardless of this setting.
32    #[serde(with = "humantime_serde", default = "default_update_interval")]
33    pub update_interval: Duration,
34    /// Timeout of an individual request.
35    #[serde(with = "humantime_serde", default = "default_request_timeout")]
36    pub request_timeout: Duration,
37    /// Path to the cache directory that can be used to provide latest data if history service is unavailable.
38    pub cache_dir: Option<PathBuf>,
39    /// Capacity of communication channels created by this client. It must be above zero.
40    #[serde(default = "default_channel_capacity")]
41    pub channel_capacity: usize,
42}
43
44fn default_urls() -> Vec<Url> {
45    DEFAULT_URLS
46        .iter()
47        .map(|url| Url::parse(url).unwrap())
48        .collect()
49}
50
51fn default_update_interval() -> Duration {
52    Duration::from_secs(30)
53}
54
55fn default_request_timeout() -> Duration {
56    Duration::from_secs(15)
57}
58
59fn default_channel_capacity() -> usize {
60    1000
61}
62
63impl Default for PythLazerHistoryClientConfig {
64    fn default() -> Self {
65        Self {
66            urls: default_urls(),
67            update_interval: default_update_interval(),
68            request_timeout: default_request_timeout(),
69            cache_dir: None,
70            channel_capacity: default_channel_capacity(),
71        }
72    }
73}
74
75/// Client to the history service API.
76#[derive(Debug, Clone)]
77pub struct PythLazerHistoryClient {
78    config: Arc<PythLazerHistoryClientConfig>,
79    client: reqwest::Client,
80}
81
82impl PythLazerHistoryClient {
83    pub fn new(config: PythLazerHistoryClientConfig) -> Self {
84        Self {
85            config: Arc::new(config),
86            client: reqwest::Client::builder()
87                .timeout(DEFAULT_REQUEST_TIMEOUT)
88                .build()
89                .expect("failed to initialize reqwest"),
90        }
91    }
92
93    fn symbols_cache_file_path(&self) -> Option<PathBuf> {
94        self.config
95            .cache_dir
96            .as_ref()
97            .map(|path| path.join("symbols_v1.json"))
98    }
99
100    /// Fetch current metadata for all symbols.
101    pub async fn all_symbols_metadata(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
102        self.fetch_symbols_initial().await
103    }
104
105    /// Fetch metadata for all symbols as an auto-updating handle.
106    ///
107    /// Returns an error if the initial fetch failed.
108    /// The returned `SymbolMetadataHandle` will be updated by a background task when the data changes.
109    pub async fn all_symbols_metadata_handle(&self) -> anyhow::Result<SymbolMetadataHandle> {
110        let symbols = Arc::new(
111            self.fetch_symbols_initial()
112                .await?
113                .into_iter()
114                .map(|f| (f.pyth_lazer_id, f))
115                .collect::<HashMap<_, _>>(),
116        );
117        let previous_symbols = symbols.clone();
118        let handle = Arc::new(ArcSwap::new(symbols));
119        let client = self.clone();
120        let weak_handle = Arc::downgrade(&handle);
121        tokio::spawn(async move {
122            client
123                .update_symbols_handle(weak_handle, previous_symbols)
124                .await;
125        });
126        Ok(SymbolMetadataHandle(handle))
127    }
128
129    /// Fetch metadata for all symbols as an auto-updating handle.
130    ///
131    /// The returned `SymbolMetadataHandle` will be updated by a background task when the data changes.
132    /// If the initial fetch failed, the handle will initially contain an empty hashmap.
133    pub async fn all_symbols_metadata_fault_tolerant_handle(&self) -> SymbolMetadataHandle {
134        let initial_result = self.fetch_symbols_initial().await;
135        let symbols = match initial_result {
136            Ok(data) => data
137                .into_iter()
138                .map(|f| (f.pyth_lazer_id, f))
139                .collect::<HashMap<_, _>>(),
140            Err(err) => {
141                warn!(
142                    ?err,
143                    "failed to fetch symbols, proceeding with empty symbol list"
144                );
145                HashMap::new()
146            }
147        };
148        let symbols = Arc::new(symbols);
149        let previous_symbols = symbols.clone();
150        let handle = Arc::new(ArcSwap::new(symbols));
151        let weak_handle = Arc::downgrade(&handle);
152        let client = self.clone();
153        tokio::spawn(async move {
154            client
155                .update_symbols_handle(weak_handle, previous_symbols)
156                .await;
157        });
158        SymbolMetadataHandle(handle)
159    }
160
161    /// Fetch metadata for all symbols as a receiver.
162    ///
163    /// Returns an error if the initial fetch failed.
164    /// On a successful return, the channel will always contain the initial data that can be fetched
165    /// immediately from the returned receiver.
166    /// You should continuously poll the receiver to receive updates.
167    pub async fn all_symbols_metadata_stream(
168        &self,
169    ) -> anyhow::Result<mpsc::Receiver<Vec<SymbolMetadata>>> {
170        if self.config.channel_capacity == 0 {
171            bail!("channel_capacity cannot be 0");
172        }
173        let symbols = self.fetch_symbols_initial().await?;
174        let (sender, receiver) = mpsc::channel(self.config.channel_capacity);
175
176        let previous_symbols = symbols.clone();
177        sender
178            .send(symbols)
179            .await
180            .expect("send to new channel failed");
181        let client = self.clone();
182        tokio::spawn(async move {
183            client.update_symbols_stream(sender, previous_symbols).await;
184        });
185        Ok(receiver)
186    }
187
188    async fn update_symbols_handle(
189        &self,
190        handle: Weak<ArcSwap<HashMap<PriceFeedId, SymbolMetadata>>>,
191        mut previous_symbols: Arc<HashMap<PriceFeedId, SymbolMetadata>>,
192    ) {
193        info!("starting background task for updating symbols");
194        loop {
195            sleep(DEFAULT_UPDATE_INTERVAL).await;
196            if handle.upgrade().is_none() {
197                info!("symbols handle dropped, stopping background task");
198                return;
199            }
200            match self.fetch_symbols().await {
201                Ok(new_symbols) => {
202                    let new_symbols = new_symbols
203                        .into_iter()
204                        .map(|f| (f.pyth_lazer_id, f))
205                        .collect::<HashMap<_, _>>();
206                    if *previous_symbols != new_symbols {
207                        let Some(handle) = handle.upgrade() else {
208                            info!("symbols handle dropped, stopping background task");
209                            return;
210                        };
211                        info!("symbols changed");
212                        if let Some(cache_file_path) = self.symbols_cache_file_path() {
213                            if let Err(err) = atomic_save_file(&cache_file_path, &new_symbols) {
214                                warn!(?err, ?cache_file_path, "failed to save data to cache file");
215                            }
216                        }
217                        let new_symbols = Arc::new(new_symbols);
218                        previous_symbols = new_symbols.clone();
219                        handle.store(new_symbols);
220                    }
221                }
222                Err(err) => {
223                    warn!(?err, "failed to fetch symbols");
224                }
225            }
226        }
227    }
228
229    async fn update_symbols_stream(
230        &self,
231        handle: mpsc::Sender<Vec<SymbolMetadata>>,
232        mut previous_symbols: Vec<SymbolMetadata>,
233    ) {
234        info!("starting background task for updating symbols");
235        loop {
236            sleep(DEFAULT_UPDATE_INTERVAL).await;
237            if handle.is_closed() {
238                info!("symbols channel closed, stopping background task");
239                return;
240            }
241            match self.fetch_symbols().await {
242                Ok(new_symbols) => {
243                    if *previous_symbols != new_symbols {
244                        info!("symbols changed");
245                        if let Some(cache_file_path) = self.symbols_cache_file_path() {
246                            if let Err(err) = atomic_save_file(&cache_file_path, &new_symbols) {
247                                warn!(?err, ?cache_file_path, "failed to save data to cache file");
248                            }
249                        }
250                        previous_symbols = new_symbols.clone();
251                        if handle.send(new_symbols).await.is_err() {
252                            info!("symbols channel closed, stopping background task");
253                            return;
254                        }
255                    }
256                }
257                Err(err) => {
258                    warn!(?err, "failed to fetch symbols");
259                }
260            }
261        }
262    }
263
264    async fn fetch_symbols_initial(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
265        let result = self.fetch_symbols().await;
266        match result {
267            Ok(data) => {
268                info!("fetched initial symbols from history service");
269                if let Some(cache_file_path) = self.symbols_cache_file_path() {
270                    if let Err(err) = atomic_save_file(&cache_file_path, &data) {
271                        warn!(?err, ?cache_file_path, "failed to save data to cache file");
272                    }
273                }
274                Ok(data)
275            }
276            Err(err) => match self.symbols_cache_file_path() {
277                Some(cache_file_path) => match load_file::<Vec<SymbolMetadata>>(&cache_file_path) {
278                    Ok(Some(data)) => {
279                        info!(?err, "failed to fetch initial symbols from history service, but fetched last known symbols from cache");
280                        Ok(data)
281                    }
282                    Ok(None) => Err(err),
283                    Err(cache_err) => {
284                        warn!(?cache_err, "failed to fetch data from cache");
285                        Err(err)
286                    }
287                },
288                None => Err(err),
289            },
290        }
291    }
292
293    async fn fetch_symbols(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
294        if self.config.urls.is_empty() {
295            bail!("no history urls provided");
296        }
297        let mut futures = self
298            .config
299            .urls
300            .iter()
301            .map(|url| Box::pin(self.fetch_symbols_single(url)))
302            .collect::<FuturesUnordered<_>>();
303        while let Some(result) = futures.next().await {
304            match result {
305                Ok(output) => return Ok(output),
306                Err(err) => {
307                    warn!("failed to fetch symbols: {:?}", err);
308                }
309            }
310        }
311
312        bail!(
313            "failed to fetch symbols from any urls ({:?})",
314            self.config.urls
315        );
316    }
317
318    async fn fetch_symbols_single(&self, url: &Url) -> anyhow::Result<Vec<SymbolMetadata>> {
319        let url = url.join("v1/symbols")?;
320        retry_notify(
321            ExponentialBackoff::<SystemClock> {
322                // We will retry all requests after `update_interval`, so there is
323                // no reason to continue retrying here.
324                max_elapsed_time: Some(self.config.update_interval),
325                ..Default::default()
326            },
327            || async {
328                let response = self
329                    .client
330                    .get(url.clone())
331                    .send()
332                    .await
333                    .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))?
334                    .backoff_error_for_status()?;
335                response
336                    .json::<Vec<SymbolMetadata>>()
337                    .await
338                    .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))
339            },
340            |e, _| warn!("failed to fetch symbols from {} (will retry): {:?}", url, e),
341        )
342        .await
343    }
344}
345
346#[derive(Debug, Clone)]
347pub struct SymbolMetadataHandle(Arc<ArcSwap<HashMap<PriceFeedId, SymbolMetadata>>>);
348
349impl SymbolMetadataHandle {
350    pub fn symbols(&self) -> arc_swap::Guard<Arc<HashMap<PriceFeedId, SymbolMetadata>>> {
351        self.0.load()
352    }
353
354    pub fn new_for_test(data: HashMap<PriceFeedId, SymbolMetadata>) -> Self {
355        Self(Arc::new(ArcSwap::new(Arc::new(data))))
356    }
357}
358
359trait BackoffErrorForStatusExt: Sized {
360    fn backoff_error_for_status(self) -> Result<Self, backoff::Error<anyhow::Error>>;
361}
362
363impl BackoffErrorForStatusExt for reqwest::Response {
364    fn backoff_error_for_status(self) -> Result<Self, backoff::Error<anyhow::Error>> {
365        let status = self.status();
366        self.error_for_status().map_err(|err| {
367            if status.is_server_error() {
368                backoff::Error::transient(err.into())
369            } else {
370                backoff::Error::permanent(err.into())
371            }
372        })
373    }
374}
375
376fn load_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<Option<T>> {
377    let parent_path = path.parent().context("invalid file path: no parent")?;
378    fs_err::create_dir_all(parent_path)?;
379
380    if !path.try_exists()? {
381        return Ok(None);
382    }
383    let json_data = fs_err::read_to_string(path)?;
384    let data = serde_json::from_str::<T>(&json_data)?;
385    Ok(Some(data))
386}
387
388fn atomic_save_file(path: &Path, data: &impl Serialize) -> anyhow::Result<()> {
389    let parent_path = path.parent().context("invalid file path: no parent")?;
390    fs_err::create_dir_all(parent_path)?;
391
392    let json_data = serde_json::to_string(&data)?;
393    let tmp_path = path.with_extension("tmp");
394    let mut tmp_file = fs_err::File::create(&tmp_path)?;
395    tmp_file.write_all(json_data.as_bytes())?;
396    tmp_file.flush()?;
397    tmp_file.sync_all()?;
398    replace_atomic(&tmp_path, path)?;
399
400    Ok(())
401}