pyth_lazer_client/
history_client.rs

1use {
2    anyhow::{bail, format_err, Context as _},
3    atomicwrites::replace_atomic,
4    backoff::{exponential::ExponentialBackoff, future::retry_notify, SystemClock},
5    futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt},
6    pyth_lazer_protocol::jrpc::SymbolMetadata,
7    pyth_lazer_publisher_sdk::state::State,
8    serde::{
9        de::{DeserializeOwned, Error as _},
10        ser::Error as _,
11        Deserialize, Serialize,
12    },
13    std::{
14        future::Future,
15        io::Write,
16        path::{Path, PathBuf},
17        sync::Arc,
18        time::Duration,
19    },
20    tokio::{sync::mpsc, time::sleep},
21    tokio_stream::wrappers::ReceiverStream,
22    tracing::{info, info_span, warn, Instrument},
23    url::Url,
24};
25
26/// Configuration for the history client.
27#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
28pub struct PythLazerHistoryClientConfig {
29    /// URLs of the history services.
30    #[serde(default = "default_urls")]
31    pub urls: Vec<Url>,
32    /// Interval of queries to the history services.
33    /// Note: if the request fails, it will be retried using exponential backoff regardless of this setting.
34    #[serde(with = "humantime_serde", default = "default_update_interval")]
35    pub update_interval: Duration,
36    /// Timeout of an individual request.
37    #[serde(with = "humantime_serde", default = "default_request_timeout")]
38    pub request_timeout: Duration,
39    /// Path to the cache directory that can be used to provide latest data if history service is unavailable.
40    pub cache_dir: Option<PathBuf>,
41    /// Capacity of communication channels created by this client. It must be above zero.
42    #[serde(default = "default_channel_capacity")]
43    pub channel_capacity: usize,
44    /// Access token for publisher or governance restricted endpoints.
45    ///
46    /// Not needed for consumer facing endpoints.
47    pub access_token: Option<String>,
48}
49
50fn default_urls() -> Vec<Url> {
51    vec![Url::parse("https://history.pyth-lazer.dourolabs.app/").unwrap()]
52}
53
54fn default_update_interval() -> Duration {
55    Duration::from_secs(30)
56}
57
58fn default_request_timeout() -> Duration {
59    Duration::from_secs(15)
60}
61
62fn default_channel_capacity() -> usize {
63    1000
64}
65
66impl Default for PythLazerHistoryClientConfig {
67    fn default() -> Self {
68        Self {
69            urls: default_urls(),
70            update_interval: default_update_interval(),
71            request_timeout: default_request_timeout(),
72            cache_dir: None,
73            channel_capacity: default_channel_capacity(),
74            access_token: None,
75        }
76    }
77}
78
79/// Client to the history service API.
80#[derive(Debug, Clone)]
81pub struct PythLazerHistoryClient {
82    config: Arc<PythLazerHistoryClientConfig>,
83    client: reqwest::Client,
84}
85
86/// Specifies which parts of the state should be present in the output.
87#[derive(Debug, Clone, Default, Deserialize, Serialize)]
88pub struct GetStateParams {
89    #[serde(default)]
90    pub all: bool,
91    #[serde(default)]
92    pub publishers: bool,
93    #[serde(default)]
94    pub feeds: bool,
95    #[serde(default)]
96    pub governance_sources: bool,
97    #[serde(default)]
98    pub feature_flags: bool,
99}
100
101impl PythLazerHistoryClient {
102    pub fn new(config: PythLazerHistoryClientConfig) -> Self {
103        Self {
104            client: reqwest::Client::builder()
105                .timeout(config.request_timeout)
106                .build()
107                .expect("failed to initialize reqwest"),
108            config: Arc::new(config),
109        }
110    }
111
112    fn symbols_cache_file_path(&self) -> Option<PathBuf> {
113        self.config
114            .cache_dir
115            .as_ref()
116            .map(|path| path.join("symbols_v1.json"))
117    }
118
119    fn state_cache_file_path(&self, params: &GetStateParams) -> Option<PathBuf> {
120        let GetStateParams {
121            all,
122            publishers,
123            feeds,
124            governance_sources,
125            feature_flags,
126        } = params;
127
128        self.config.cache_dir.as_ref().map(|path| {
129            path.join(format!(
130                "state_{}{}{}{}{}_v1.json",
131                *all as u8,
132                *publishers as u8,
133                *feeds as u8,
134                *governance_sources as u8,
135                *feature_flags as u8,
136            ))
137        })
138    }
139
140    /// Fetch current metadata for all symbols.
141    pub async fn all_symbols_metadata(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
142        self.fetch_from_all_urls_or_file(self.symbols_cache_file_path(), |url| {
143            self.request_symbols(url)
144        })
145        .instrument(info_span!("all_symbols_metadata"))
146        .await
147    }
148
149    /// Creates a fault-tolerant stream that requests the list of symbols and yields new items
150    /// when a change of value occurs.
151    ///
152    /// Returns an error if the initial fetch failed.
153    /// On a successful return, the channel will always contain the initial data that can be fetched
154    /// immediately from the returned stream.
155    /// You should continuously poll the stream to receive updates.
156    pub async fn all_symbols_metadata_stream(
157        &self,
158    ) -> anyhow::Result<impl Stream<Item = Vec<SymbolMetadata>> + Unpin> {
159        self.stream(self.symbols_cache_file_path(), |client, url| {
160            Box::pin(client.request_symbols(url))
161        })
162        .instrument(info_span!("all_symbols_metadata_stream"))
163        .await
164    }
165
166    /// Creates a fault-tolerant stream that requests data using `f` and yields new items
167    /// when a change of value occurs.
168    ///
169    /// Returns an error if the initial fetch failed.
170    /// On a successful return, the channel will always contain the initial data that can be fetched
171    /// immediately from the returned stream.
172    /// You should continuously poll the stream to receive updates.
173    async fn stream<F, R>(
174        &self,
175        cache_file_path: Option<PathBuf>,
176        f: F,
177    ) -> anyhow::Result<impl Stream<Item = R> + Unpin>
178    where
179        for<'a> F: Fn(&'a Self, &'a Url) -> BoxFuture<'a, Result<R, backoff::Error<anyhow::Error>>>
180            + Send
181            + Sync
182            + 'static,
183        R: Clone + Serialize + DeserializeOwned + PartialEq + Send + Sync + 'static,
184    {
185        if self.config.channel_capacity == 0 {
186            bail!("channel_capacity cannot be 0");
187        }
188        let symbols = self
189            .fetch_from_all_urls_or_file(cache_file_path.clone(), |url| f(self, url))
190            .await?;
191        let (sender, receiver) = mpsc::channel(self.config.channel_capacity);
192
193        let previous_symbols = symbols.clone();
194        sender
195            .send(symbols)
196            .await
197            .expect("send to new channel failed");
198        let client = self.clone();
199        tokio::spawn(
200            async move {
201                client
202                    .keep_stream_updated(cache_file_path, sender, previous_symbols, |url| {
203                        f(&client, url)
204                    })
205                    .await;
206            }
207            .in_current_span(),
208        );
209        Ok(ReceiverStream::new(receiver))
210    }
211
212    /// Requests new data using `f` repeatedly,
213    /// writes new data to the cache file and sends it using `sender`.
214    async fn keep_stream_updated<'a, F, Fut, R>(
215        &'a self,
216        cache_file_path: Option<PathBuf>,
217        sender: mpsc::Sender<R>,
218        mut previous_data: R,
219        f: F,
220    ) where
221        F: Fn(&'a Url) -> Fut,
222        Fut: Future<Output = Result<R, backoff::Error<anyhow::Error>>>,
223        R: Serialize + DeserializeOwned + PartialEq + Clone,
224    {
225        info!("starting background task for updating data");
226        loop {
227            sleep(self.config.update_interval).await;
228            if sender.is_closed() {
229                info!("data handle dropped, stopping background task");
230                return;
231            }
232            match self.fetch_from_all_urls(true, &f).await {
233                Ok(new_data) => {
234                    if previous_data != new_data {
235                        info!("data changed");
236                        if let Some(cache_file_path) = &cache_file_path {
237                            if let Err(err) = atomic_save_file(cache_file_path, &new_data) {
238                                warn!(?err, ?cache_file_path, "failed to save data to cache file");
239                            }
240                        }
241
242                        previous_data = new_data.clone();
243                        if sender.send(new_data.clone()).await.is_err() {
244                            info!("update handle dropped, stopping background task");
245                            return;
246                        }
247                    }
248                }
249                Err(err) => {
250                    warn!(?err, "failed to fetch data");
251                }
252            }
253        }
254    }
255
256    /// Uses all configured URLs to perform request `f` and handles retrying on error.
257    /// Returns the value once any of the requests succeeds. If all requests fail,
258    /// tries to fetch the data from local cache. If loading from cache also fails,
259    /// it keeps retrying the requests until any of them succeeds.
260    async fn fetch_from_all_urls_or_file<'a, F, Fut, R>(
261        &'a self,
262        cache_file_path: Option<PathBuf>,
263        f: F,
264    ) -> anyhow::Result<R>
265    where
266        F: Fn(&'a Url) -> Fut,
267        Fut: Future<Output = Result<R, backoff::Error<anyhow::Error>>>,
268        R: Serialize + DeserializeOwned,
269    {
270        let result = self.fetch_from_all_urls(true, &f).await;
271        match result {
272            Ok(data) => {
273                info!("fetched initial data from history service");
274                if let Some(cache_file_path) = cache_file_path {
275                    if let Err(err) = atomic_save_file::<R>(&cache_file_path, &data) {
276                        warn!(?err, ?cache_file_path, "failed to save data to cache file");
277                    }
278                }
279                return Ok(data);
280            }
281            Err(err) => {
282                warn!(?err, "all requests failed");
283            }
284        }
285
286        if let Some(cache_file_path) = cache_file_path {
287            match load_file::<R>(&cache_file_path) {
288                Ok(Some(data)) => {
289                    info!(
290                        "failed to fetch initial data from history service, \
291                        but fetched last known data from cache"
292                    );
293                    return Ok(data);
294                }
295                Ok(None) => {
296                    info!("no data found in cache");
297                }
298                Err(err) => {
299                    warn!(?err, "failed to fetch data from cache");
300                }
301            }
302        }
303
304        self.fetch_from_all_urls(false, f).await
305    }
306
307    /// Uses all configured URLs to perform request `f` and handles retrying on error.
308    ///
309    /// Returns the value once any of the requests succeeds.
310    /// If `limit_by_update_interval` is true, the total time spent retrying it limited to
311    /// `self.config.update_interval`. If `limit_by_update_interval` is false, the requests
312    /// will be retried indefinitely.
313    async fn fetch_from_all_urls<'a, F, Fut, R>(
314        &'a self,
315        limit_by_update_interval: bool,
316        f: F,
317    ) -> anyhow::Result<R>
318    where
319        F: Fn(&'a Url) -> Fut,
320        Fut: Future<Output = Result<R, backoff::Error<anyhow::Error>>>,
321    {
322        if self.config.urls.is_empty() {
323            bail!("no history urls provided");
324        }
325        let mut futures = self
326            .config
327            .urls
328            .iter()
329            .map(|url| {
330                Box::pin(self.fetch_from_single_url_with_retry(limit_by_update_interval, || f(url)))
331            })
332            .collect::<FuturesUnordered<_>>();
333        while let Some(result) = futures.next().await {
334            match result {
335                Ok(output) => return Ok(output),
336                Err(err) => {
337                    warn!("failed to fetch symbols: {:?}", err);
338                }
339            }
340        }
341
342        bail!(
343            "failed to fetch data from any urls ({:?})",
344            self.config.urls
345        );
346    }
347
348    async fn fetch_from_single_url_with_retry<F, Fut, R>(
349        &self,
350        limit_by_update_interval: bool,
351        f: F,
352    ) -> anyhow::Result<R>
353    where
354        F: FnMut() -> Fut,
355        Fut: Future<Output = Result<R, backoff::Error<anyhow::Error>>>,
356    {
357        let mut backoff = ExponentialBackoff::<SystemClock>::default();
358        if limit_by_update_interval {
359            backoff.max_elapsed_time = Some(self.config.update_interval);
360        }
361        retry_notify(backoff, f, |e, _| warn!(?e, "operation failed, will retry")).await
362    }
363
364    async fn request_symbols(
365        &self,
366        url: &Url,
367    ) -> Result<Vec<SymbolMetadata>, backoff::Error<anyhow::Error>> {
368        let url = url
369            .join("v1/symbols")
370            .map_err(|err| backoff::Error::permanent(anyhow::Error::from(err)))?;
371
372        let response = self
373            .client
374            .get(url.clone())
375            .send()
376            .await
377            .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))?
378            .backoff_error_for_status()?;
379        let vec = response
380            .json::<Vec<SymbolMetadata>>()
381            .await
382            .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))?;
383        Ok(vec)
384    }
385
386    /// Fetch a partial state snapshot containing data specified in `params`.
387    pub async fn state(&self, params: GetStateParams) -> anyhow::Result<State> {
388        self.fetch_from_all_urls_or_file(self.state_cache_file_path(&params), move |url| {
389            self.request_state(url, params.clone())
390        })
391        .instrument(info_span!("state"))
392        .await
393        .map(|s| s.0)
394    }
395
396    /// Fetch a part of the current state specified by `params`.
397    ///
398    /// Creates a fault-tolerant stream that requests a partial state snapshot
399    /// containing data specified in `params`. It yields new items
400    /// when a change of value occurs.
401    ///
402    /// Returns an error if the initial fetch failed.
403    /// On a successful return, the stream will always contain the initial data that can be fetched
404    /// immediately from the returned stream.
405    /// You should continuously poll the stream to receive updates.
406    pub async fn state_stream(
407        &self,
408        params: GetStateParams,
409    ) -> anyhow::Result<impl Stream<Item = State> + Unpin> {
410        let stream = self
411            .stream(self.state_cache_file_path(&params), move |client, url| {
412                Box::pin(client.request_state(url, params.clone()))
413            })
414            .instrument(info_span!("state_stream"))
415            .await?;
416        Ok(stream.map(|s| s.0))
417    }
418
419    /// Fetch data from /v1/state endpoint without any timeouts or retries.
420    async fn request_state(
421        &self,
422        url: &Url,
423        params: GetStateParams,
424    ) -> Result<StateWithSerde, backoff::Error<anyhow::Error>> {
425        let url = url
426            .join("v1/state")
427            .map_err(|err| backoff::Error::permanent(anyhow::Error::from(err)))?;
428        let access_token = self.config.access_token.as_ref().ok_or_else(|| {
429            backoff::Error::permanent(format_err!("missing access_token in config"))
430        })?;
431        let response = self
432            .client
433            .get(url.clone())
434            .query(&params)
435            .bearer_auth(access_token)
436            .send()
437            .await
438            .map_err(|err| {
439                backoff::Error::transient(
440                    anyhow::Error::from(err).context(format!("failed to fetch state from {url}")),
441                )
442            })?
443            .backoff_error_for_status()?;
444        let bytes = response.bytes().await.map_err(|err| {
445            backoff::Error::transient(
446                anyhow::Error::from(err).context(format!("failed to fetch state from {url}")),
447            )
448        })?;
449        let json = String::from_utf8(bytes.into()).map_err(|err| {
450            backoff::Error::permanent(
451                anyhow::Error::from(err).context(format!("failed to parse state from {url}")),
452            )
453        })?;
454        let state = protobuf_json_mapping::parse_from_str::<State>(&json).map_err(|err| {
455            backoff::Error::permanent(
456                anyhow::Error::from(err).context(format!("failed to parse state from {url}")),
457            )
458        })?;
459        Ok(StateWithSerde(state))
460    }
461}
462
463// State wrapper that delegates serialization and deserialization to `protobuf_json_mapping`.
464#[derive(Debug, Clone, PartialEq)]
465struct StateWithSerde(State);
466
467impl Serialize for StateWithSerde {
468    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
469    where
470        S: serde::Serializer,
471    {
472        let json = protobuf_json_mapping::print_to_string(&self.0).map_err(S::Error::custom)?;
473        let json_value =
474            serde_json::from_str::<serde_json::Value>(&json).map_err(S::Error::custom)?;
475        json_value.serialize(serializer)
476    }
477}
478
479impl<'de> Deserialize<'de> for StateWithSerde {
480    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
481    where
482        D: serde::Deserializer<'de>,
483    {
484        let json_value = serde_json::Value::deserialize(deserializer)?;
485        let json = serde_json::to_string(&json_value).map_err(D::Error::custom)?;
486        let value = protobuf_json_mapping::parse_from_str(&json).map_err(D::Error::custom)?;
487        Ok(Self(value))
488    }
489}
490
491trait BackoffErrorForStatusExt: Sized {
492    fn backoff_error_for_status(self) -> Result<Self, backoff::Error<anyhow::Error>>;
493}
494
495impl BackoffErrorForStatusExt for reqwest::Response {
496    fn backoff_error_for_status(self) -> Result<Self, backoff::Error<anyhow::Error>> {
497        let status = self.status();
498        self.error_for_status().map_err(|err| {
499            if status.is_server_error() {
500                backoff::Error::transient(err.into())
501            } else {
502                backoff::Error::permanent(err.into())
503            }
504        })
505    }
506}
507
508fn load_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<Option<T>> {
509    let parent_path = path.parent().context("invalid file path: no parent")?;
510    fs_err::create_dir_all(parent_path)?;
511
512    if !path.try_exists()? {
513        return Ok(None);
514    }
515    let json_data = fs_err::read_to_string(path)?;
516    let data = serde_json::from_str::<T>(&json_data)?;
517    Ok(Some(data))
518}
519
520fn atomic_save_file<T: Serialize>(path: &Path, data: &T) -> anyhow::Result<()> {
521    let parent_path = path.parent().context("invalid file path: no parent")?;
522    fs_err::create_dir_all(parent_path)?;
523
524    let json_data = serde_json::to_string(&data)?;
525    let tmp_path = path.with_extension("tmp");
526    let mut tmp_file = fs_err::File::create(&tmp_path)?;
527    tmp_file.write_all(json_data.as_bytes())?;
528    tmp_file.flush()?;
529    tmp_file.sync_all()?;
530    replace_atomic(&tmp_path, path)?;
531
532    Ok(())
533}