1use {
2 anyhow::{bail, format_err, Context as _},
3 atomicwrites::replace_atomic,
4 backoff::{exponential::ExponentialBackoff, future::retry_notify, SystemClock},
5 futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt},
6 pyth_lazer_protocol::jrpc::SymbolMetadata,
7 pyth_lazer_publisher_sdk::state::State,
8 serde::{
9 de::{DeserializeOwned, Error as _},
10 ser::Error as _,
11 Deserialize, Serialize,
12 },
13 std::{
14 future::Future,
15 io::Write,
16 path::{Path, PathBuf},
17 sync::Arc,
18 time::Duration,
19 },
20 tokio::{sync::mpsc, time::sleep},
21 tokio_stream::wrappers::ReceiverStream,
22 tracing::{info, info_span, warn, Instrument},
23 url::Url,
24};
25
26#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
28pub struct PythLazerHistoryClientConfig {
29 #[serde(default = "default_urls")]
31 pub urls: Vec<Url>,
32 #[serde(with = "humantime_serde", default = "default_update_interval")]
35 pub update_interval: Duration,
36 #[serde(with = "humantime_serde", default = "default_request_timeout")]
38 pub request_timeout: Duration,
39 pub cache_dir: Option<PathBuf>,
41 #[serde(default = "default_channel_capacity")]
43 pub channel_capacity: usize,
44 pub access_token: Option<String>,
48}
49
50fn default_urls() -> Vec<Url> {
51 vec![Url::parse("https://history.pyth-lazer.dourolabs.app/").unwrap()]
52}
53
54fn default_update_interval() -> Duration {
55 Duration::from_secs(30)
56}
57
58fn default_request_timeout() -> Duration {
59 Duration::from_secs(15)
60}
61
62fn default_channel_capacity() -> usize {
63 1000
64}
65
66impl Default for PythLazerHistoryClientConfig {
67 fn default() -> Self {
68 Self {
69 urls: default_urls(),
70 update_interval: default_update_interval(),
71 request_timeout: default_request_timeout(),
72 cache_dir: None,
73 channel_capacity: default_channel_capacity(),
74 access_token: None,
75 }
76 }
77}
78
79#[derive(Debug, Clone)]
81pub struct PythLazerHistoryClient {
82 config: Arc<PythLazerHistoryClientConfig>,
83 client: reqwest::Client,
84}
85
86#[derive(Debug, Clone, Default, Deserialize, Serialize)]
88pub struct GetStateParams {
89 #[serde(default)]
90 pub all: bool,
91 #[serde(default)]
92 pub publishers: bool,
93 #[serde(default)]
94 pub feeds: bool,
95 #[serde(default)]
96 pub governance_sources: bool,
97 #[serde(default)]
98 pub feature_flags: bool,
99}
100
101impl PythLazerHistoryClient {
102 pub fn new(config: PythLazerHistoryClientConfig) -> Self {
103 Self {
104 client: reqwest::Client::builder()
105 .timeout(config.request_timeout)
106 .build()
107 .expect("failed to initialize reqwest"),
108 config: Arc::new(config),
109 }
110 }
111
112 fn symbols_cache_file_path(&self) -> Option<PathBuf> {
113 self.config
114 .cache_dir
115 .as_ref()
116 .map(|path| path.join("symbols_v1.json"))
117 }
118
119 fn state_cache_file_path(&self, params: &GetStateParams) -> Option<PathBuf> {
120 let GetStateParams {
121 all,
122 publishers,
123 feeds,
124 governance_sources,
125 feature_flags,
126 } = params;
127
128 self.config.cache_dir.as_ref().map(|path| {
129 path.join(format!(
130 "state_{}{}{}{}{}_v1.json",
131 *all as u8,
132 *publishers as u8,
133 *feeds as u8,
134 *governance_sources as u8,
135 *feature_flags as u8,
136 ))
137 })
138 }
139
140 pub async fn all_symbols_metadata(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
142 self.fetch_from_all_urls_or_file(self.symbols_cache_file_path(), |url| {
143 self.request_symbols(url)
144 })
145 .instrument(info_span!("all_symbols_metadata"))
146 .await
147 }
148
149 pub async fn all_symbols_metadata_stream(
157 &self,
158 ) -> anyhow::Result<impl Stream<Item = Vec<SymbolMetadata>> + Unpin> {
159 self.stream(self.symbols_cache_file_path(), |client, url| {
160 Box::pin(client.request_symbols(url))
161 })
162 .instrument(info_span!("all_symbols_metadata_stream"))
163 .await
164 }
165
166 async fn stream<F, R>(
174 &self,
175 cache_file_path: Option<PathBuf>,
176 f: F,
177 ) -> anyhow::Result<impl Stream<Item = R> + Unpin>
178 where
179 for<'a> F: Fn(&'a Self, &'a Url) -> BoxFuture<'a, Result<R, backoff::Error<anyhow::Error>>>
180 + Send
181 + Sync
182 + 'static,
183 R: Clone + Serialize + DeserializeOwned + PartialEq + Send + Sync + 'static,
184 {
185 if self.config.channel_capacity == 0 {
186 bail!("channel_capacity cannot be 0");
187 }
188 let symbols = self
189 .fetch_from_all_urls_or_file(cache_file_path.clone(), |url| f(self, url))
190 .await?;
191 let (sender, receiver) = mpsc::channel(self.config.channel_capacity);
192
193 let previous_symbols = symbols.clone();
194 sender
195 .send(symbols)
196 .await
197 .expect("send to new channel failed");
198 let client = self.clone();
199 tokio::spawn(
200 async move {
201 client
202 .keep_stream_updated(cache_file_path, sender, previous_symbols, |url| {
203 f(&client, url)
204 })
205 .await;
206 }
207 .in_current_span(),
208 );
209 Ok(ReceiverStream::new(receiver))
210 }
211
212 async fn keep_stream_updated<'a, F, Fut, R>(
215 &'a self,
216 cache_file_path: Option<PathBuf>,
217 sender: mpsc::Sender<R>,
218 mut previous_data: R,
219 f: F,
220 ) where
221 F: Fn(&'a Url) -> Fut,
222 Fut: Future<Output = Result<R, backoff::Error<anyhow::Error>>>,
223 R: Serialize + DeserializeOwned + PartialEq + Clone,
224 {
225 info!("starting background task for updating data");
226 loop {
227 sleep(self.config.update_interval).await;
228 if sender.is_closed() {
229 info!("data handle dropped, stopping background task");
230 return;
231 }
232 match self.fetch_from_all_urls(true, &f).await {
233 Ok(new_data) => {
234 if previous_data != new_data {
235 info!("data changed");
236 if let Some(cache_file_path) = &cache_file_path {
237 if let Err(err) = atomic_save_file(cache_file_path, &new_data) {
238 warn!(?err, ?cache_file_path, "failed to save data to cache file");
239 }
240 }
241
242 previous_data = new_data.clone();
243 if sender.send(new_data.clone()).await.is_err() {
244 info!("update handle dropped, stopping background task");
245 return;
246 }
247 }
248 }
249 Err(err) => {
250 warn!(?err, "failed to fetch data");
251 }
252 }
253 }
254 }
255
256 async fn fetch_from_all_urls_or_file<'a, F, Fut, R>(
261 &'a self,
262 cache_file_path: Option<PathBuf>,
263 f: F,
264 ) -> anyhow::Result<R>
265 where
266 F: Fn(&'a Url) -> Fut,
267 Fut: Future<Output = Result<R, backoff::Error<anyhow::Error>>>,
268 R: Serialize + DeserializeOwned,
269 {
270 let result = self.fetch_from_all_urls(true, &f).await;
271 match result {
272 Ok(data) => {
273 info!("fetched initial data from history service");
274 if let Some(cache_file_path) = cache_file_path {
275 if let Err(err) = atomic_save_file::<R>(&cache_file_path, &data) {
276 warn!(?err, ?cache_file_path, "failed to save data to cache file");
277 }
278 }
279 return Ok(data);
280 }
281 Err(err) => {
282 warn!(?err, "all requests failed");
283 }
284 }
285
286 if let Some(cache_file_path) = cache_file_path {
287 match load_file::<R>(&cache_file_path) {
288 Ok(Some(data)) => {
289 info!(
290 "failed to fetch initial data from history service, \
291 but fetched last known data from cache"
292 );
293 return Ok(data);
294 }
295 Ok(None) => {
296 info!("no data found in cache");
297 }
298 Err(err) => {
299 warn!(?err, "failed to fetch data from cache");
300 }
301 }
302 }
303
304 self.fetch_from_all_urls(false, f).await
305 }
306
307 async fn fetch_from_all_urls<'a, F, Fut, R>(
314 &'a self,
315 limit_by_update_interval: bool,
316 f: F,
317 ) -> anyhow::Result<R>
318 where
319 F: Fn(&'a Url) -> Fut,
320 Fut: Future<Output = Result<R, backoff::Error<anyhow::Error>>>,
321 {
322 if self.config.urls.is_empty() {
323 bail!("no history urls provided");
324 }
325 let mut futures = self
326 .config
327 .urls
328 .iter()
329 .map(|url| {
330 Box::pin(self.fetch_from_single_url_with_retry(limit_by_update_interval, || f(url)))
331 })
332 .collect::<FuturesUnordered<_>>();
333 while let Some(result) = futures.next().await {
334 match result {
335 Ok(output) => return Ok(output),
336 Err(err) => {
337 warn!("failed to fetch symbols: {:?}", err);
338 }
339 }
340 }
341
342 bail!(
343 "failed to fetch data from any urls ({:?})",
344 self.config.urls
345 );
346 }
347
348 async fn fetch_from_single_url_with_retry<F, Fut, R>(
349 &self,
350 limit_by_update_interval: bool,
351 f: F,
352 ) -> anyhow::Result<R>
353 where
354 F: FnMut() -> Fut,
355 Fut: Future<Output = Result<R, backoff::Error<anyhow::Error>>>,
356 {
357 let mut backoff = ExponentialBackoff::<SystemClock>::default();
358 if limit_by_update_interval {
359 backoff.max_elapsed_time = Some(self.config.update_interval);
360 }
361 retry_notify(backoff, f, |e, _| warn!(?e, "operation failed, will retry")).await
362 }
363
364 async fn request_symbols(
365 &self,
366 url: &Url,
367 ) -> Result<Vec<SymbolMetadata>, backoff::Error<anyhow::Error>> {
368 let url = url
369 .join("v1/symbols")
370 .map_err(|err| backoff::Error::permanent(anyhow::Error::from(err)))?;
371
372 let response = self
373 .client
374 .get(url.clone())
375 .send()
376 .await
377 .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))?
378 .backoff_error_for_status()?;
379 let vec = response
380 .json::<Vec<SymbolMetadata>>()
381 .await
382 .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))?;
383 Ok(vec)
384 }
385
386 pub async fn state(&self, params: GetStateParams) -> anyhow::Result<State> {
388 self.fetch_from_all_urls_or_file(self.state_cache_file_path(¶ms), move |url| {
389 self.request_state(url, params.clone())
390 })
391 .instrument(info_span!("state"))
392 .await
393 .map(|s| s.0)
394 }
395
396 pub async fn state_stream(
407 &self,
408 params: GetStateParams,
409 ) -> anyhow::Result<impl Stream<Item = State> + Unpin> {
410 let stream = self
411 .stream(self.state_cache_file_path(¶ms), move |client, url| {
412 Box::pin(client.request_state(url, params.clone()))
413 })
414 .instrument(info_span!("state_stream"))
415 .await?;
416 Ok(stream.map(|s| s.0))
417 }
418
419 async fn request_state(
421 &self,
422 url: &Url,
423 params: GetStateParams,
424 ) -> Result<StateWithSerde, backoff::Error<anyhow::Error>> {
425 let url = url
426 .join("v1/state")
427 .map_err(|err| backoff::Error::permanent(anyhow::Error::from(err)))?;
428 let access_token = self.config.access_token.as_ref().ok_or_else(|| {
429 backoff::Error::permanent(format_err!("missing access_token in config"))
430 })?;
431 let response = self
432 .client
433 .get(url.clone())
434 .query(¶ms)
435 .bearer_auth(access_token)
436 .send()
437 .await
438 .map_err(|err| {
439 backoff::Error::transient(
440 anyhow::Error::from(err).context(format!("failed to fetch state from {url}")),
441 )
442 })?
443 .backoff_error_for_status()?;
444 let bytes = response.bytes().await.map_err(|err| {
445 backoff::Error::transient(
446 anyhow::Error::from(err).context(format!("failed to fetch state from {url}")),
447 )
448 })?;
449 let json = String::from_utf8(bytes.into()).map_err(|err| {
450 backoff::Error::permanent(
451 anyhow::Error::from(err).context(format!("failed to parse state from {url}")),
452 )
453 })?;
454 let state = protobuf_json_mapping::parse_from_str::<State>(&json).map_err(|err| {
455 backoff::Error::permanent(
456 anyhow::Error::from(err).context(format!("failed to parse state from {url}")),
457 )
458 })?;
459 Ok(StateWithSerde(state))
460 }
461}
462
463#[derive(Debug, Clone, PartialEq)]
465struct StateWithSerde(State);
466
467impl Serialize for StateWithSerde {
468 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
469 where
470 S: serde::Serializer,
471 {
472 let json = protobuf_json_mapping::print_to_string(&self.0).map_err(S::Error::custom)?;
473 let json_value =
474 serde_json::from_str::<serde_json::Value>(&json).map_err(S::Error::custom)?;
475 json_value.serialize(serializer)
476 }
477}
478
479impl<'de> Deserialize<'de> for StateWithSerde {
480 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
481 where
482 D: serde::Deserializer<'de>,
483 {
484 let json_value = serde_json::Value::deserialize(deserializer)?;
485 let json = serde_json::to_string(&json_value).map_err(D::Error::custom)?;
486 let value = protobuf_json_mapping::parse_from_str(&json).map_err(D::Error::custom)?;
487 Ok(Self(value))
488 }
489}
490
491trait BackoffErrorForStatusExt: Sized {
492 fn backoff_error_for_status(self) -> Result<Self, backoff::Error<anyhow::Error>>;
493}
494
495impl BackoffErrorForStatusExt for reqwest::Response {
496 fn backoff_error_for_status(self) -> Result<Self, backoff::Error<anyhow::Error>> {
497 let status = self.status();
498 self.error_for_status().map_err(|err| {
499 if status.is_server_error() {
500 backoff::Error::transient(err.into())
501 } else {
502 backoff::Error::permanent(err.into())
503 }
504 })
505 }
506}
507
508fn load_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<Option<T>> {
509 let parent_path = path.parent().context("invalid file path: no parent")?;
510 fs_err::create_dir_all(parent_path)?;
511
512 if !path.try_exists()? {
513 return Ok(None);
514 }
515 let json_data = fs_err::read_to_string(path)?;
516 let data = serde_json::from_str::<T>(&json_data)?;
517 Ok(Some(data))
518}
519
520fn atomic_save_file<T: Serialize>(path: &Path, data: &T) -> anyhow::Result<()> {
521 let parent_path = path.parent().context("invalid file path: no parent")?;
522 fs_err::create_dir_all(parent_path)?;
523
524 let json_data = serde_json::to_string(&data)?;
525 let tmp_path = path.with_extension("tmp");
526 let mut tmp_file = fs_err::File::create(&tmp_path)?;
527 tmp_file.write_all(json_data.as_bytes())?;
528 tmp_file.flush()?;
529 tmp_file.sync_all()?;
530 replace_atomic(&tmp_path, path)?;
531
532 Ok(())
533}