1use std::{
2 collections::HashMap,
3 io::Write,
4 path::{Path, PathBuf},
5 sync::{Arc, Weak},
6 time::Duration,
7};
8
9use anyhow::{bail, Context as _};
10use arc_swap::ArcSwap;
11use atomicwrites::replace_atomic;
12use backoff::{exponential::ExponentialBackoff, future::retry_notify, SystemClock};
13use futures::{stream::FuturesUnordered, StreamExt};
14use pyth_lazer_protocol::{jrpc::SymbolMetadata, PriceFeedId};
15use serde::{de::DeserializeOwned, Deserialize, Serialize};
16use tokio::{sync::mpsc, time::sleep};
17use tracing::{info, warn};
18use url::Url;
19
20#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
22pub struct PythLazerHistoryClientConfig {
23 #[serde(default = "default_urls")]
25 pub urls: Vec<Url>,
26 #[serde(with = "humantime_serde", default = "default_update_interval")]
29 pub update_interval: Duration,
30 #[serde(with = "humantime_serde", default = "default_request_timeout")]
32 pub request_timeout: Duration,
33 pub cache_dir: Option<PathBuf>,
35 #[serde(default = "default_channel_capacity")]
37 pub channel_capacity: usize,
38}
39
40fn default_urls() -> Vec<Url> {
41 vec![Url::parse("https://history.pyth-lazer.dourolabs.app/").unwrap()]
42}
43
44fn default_update_interval() -> Duration {
45 Duration::from_secs(30)
46}
47
48fn default_request_timeout() -> Duration {
49 Duration::from_secs(15)
50}
51
52fn default_channel_capacity() -> usize {
53 1000
54}
55
56impl Default for PythLazerHistoryClientConfig {
57 fn default() -> Self {
58 Self {
59 urls: default_urls(),
60 update_interval: default_update_interval(),
61 request_timeout: default_request_timeout(),
62 cache_dir: None,
63 channel_capacity: default_channel_capacity(),
64 }
65 }
66}
67
68#[derive(Debug, Clone)]
70pub struct PythLazerHistoryClient {
71 config: Arc<PythLazerHistoryClientConfig>,
72 client: reqwest::Client,
73}
74
75impl PythLazerHistoryClient {
76 pub fn new(config: PythLazerHistoryClientConfig) -> Self {
77 Self {
78 client: reqwest::Client::builder()
79 .timeout(config.request_timeout)
80 .build()
81 .expect("failed to initialize reqwest"),
82 config: Arc::new(config),
83 }
84 }
85
86 fn symbols_cache_file_path(&self) -> Option<PathBuf> {
87 self.config
88 .cache_dir
89 .as_ref()
90 .map(|path| path.join("symbols_v1.json"))
91 }
92
93 pub async fn all_symbols_metadata(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
95 self.fetch_symbols_initial().await
96 }
97
98 pub async fn all_symbols_metadata_handle(&self) -> anyhow::Result<SymbolMetadataHandle> {
103 let symbols = Arc::new(
104 self.fetch_symbols_initial()
105 .await?
106 .into_iter()
107 .map(|f| (f.pyth_lazer_id, f))
108 .collect::<HashMap<_, _>>(),
109 );
110 let previous_symbols = symbols.clone();
111 let handle = Arc::new(ArcSwap::new(symbols));
112 let client = self.clone();
113 let weak_handle = Arc::downgrade(&handle);
114 tokio::spawn(async move {
115 client
116 .update_symbols_handle(weak_handle, previous_symbols)
117 .await;
118 });
119 Ok(SymbolMetadataHandle(handle))
120 }
121
122 pub async fn all_symbols_metadata_fault_tolerant_handle(&self) -> SymbolMetadataHandle {
127 let initial_result = self.fetch_symbols_initial().await;
128 let symbols = match initial_result {
129 Ok(data) => data
130 .into_iter()
131 .map(|f| (f.pyth_lazer_id, f))
132 .collect::<HashMap<_, _>>(),
133 Err(err) => {
134 warn!(
135 ?err,
136 "failed to fetch symbols, proceeding with empty symbol list"
137 );
138 HashMap::new()
139 }
140 };
141 let symbols = Arc::new(symbols);
142 let previous_symbols = symbols.clone();
143 let handle = Arc::new(ArcSwap::new(symbols));
144 let weak_handle = Arc::downgrade(&handle);
145 let client = self.clone();
146 tokio::spawn(async move {
147 client
148 .update_symbols_handle(weak_handle, previous_symbols)
149 .await;
150 });
151 SymbolMetadataHandle(handle)
152 }
153
154 pub async fn all_symbols_metadata_stream(
161 &self,
162 ) -> anyhow::Result<mpsc::Receiver<Vec<SymbolMetadata>>> {
163 if self.config.channel_capacity == 0 {
164 bail!("channel_capacity cannot be 0");
165 }
166 let symbols = self.fetch_symbols_initial().await?;
167 let (sender, receiver) = mpsc::channel(self.config.channel_capacity);
168
169 let previous_symbols = symbols.clone();
170 sender
171 .send(symbols)
172 .await
173 .expect("send to new channel failed");
174 let client = self.clone();
175 tokio::spawn(async move {
176 client.update_symbols_stream(sender, previous_symbols).await;
177 });
178 Ok(receiver)
179 }
180
181 async fn update_symbols_handle(
182 &self,
183 handle: Weak<ArcSwap<HashMap<PriceFeedId, SymbolMetadata>>>,
184 mut previous_symbols: Arc<HashMap<PriceFeedId, SymbolMetadata>>,
185 ) {
186 info!("starting background task for updating symbols");
187 loop {
188 sleep(self.config.update_interval).await;
189 if handle.upgrade().is_none() {
190 info!("symbols handle dropped, stopping background task");
191 return;
192 }
193 match self.fetch_symbols().await {
194 Ok(new_symbols) => {
195 let new_symbols = new_symbols
196 .into_iter()
197 .map(|f| (f.pyth_lazer_id, f))
198 .collect::<HashMap<_, _>>();
199 if *previous_symbols != new_symbols {
200 let Some(handle) = handle.upgrade() else {
201 info!("symbols handle dropped, stopping background task");
202 return;
203 };
204 info!("symbols changed");
205 if let Some(cache_file_path) = self.symbols_cache_file_path() {
206 if let Err(err) = atomic_save_file(&cache_file_path, &new_symbols) {
207 warn!(?err, ?cache_file_path, "failed to save data to cache file");
208 }
209 }
210 let new_symbols = Arc::new(new_symbols);
211 previous_symbols = new_symbols.clone();
212 handle.store(new_symbols);
213 }
214 }
215 Err(err) => {
216 warn!(?err, "failed to fetch symbols");
217 }
218 }
219 }
220 }
221
222 async fn update_symbols_stream(
223 &self,
224 handle: mpsc::Sender<Vec<SymbolMetadata>>,
225 mut previous_symbols: Vec<SymbolMetadata>,
226 ) {
227 info!("starting background task for updating symbols");
228 loop {
229 sleep(self.config.update_interval).await;
230 if handle.is_closed() {
231 info!("symbols channel closed, stopping background task");
232 return;
233 }
234 match self.fetch_symbols().await {
235 Ok(new_symbols) => {
236 if *previous_symbols != new_symbols {
237 info!("symbols changed");
238 if let Some(cache_file_path) = self.symbols_cache_file_path() {
239 if let Err(err) = atomic_save_file(&cache_file_path, &new_symbols) {
240 warn!(?err, ?cache_file_path, "failed to save data to cache file");
241 }
242 }
243 previous_symbols = new_symbols.clone();
244 if handle.send(new_symbols).await.is_err() {
245 info!("symbols channel closed, stopping background task");
246 return;
247 }
248 }
249 }
250 Err(err) => {
251 warn!(?err, "failed to fetch symbols");
252 }
253 }
254 }
255 }
256
257 async fn fetch_symbols_initial(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
258 let result = self.fetch_symbols().await;
259 match result {
260 Ok(data) => {
261 info!("fetched initial symbols from history service");
262 if let Some(cache_file_path) = self.symbols_cache_file_path() {
263 if let Err(err) = atomic_save_file(&cache_file_path, &data) {
264 warn!(?err, ?cache_file_path, "failed to save data to cache file");
265 }
266 }
267 Ok(data)
268 }
269 Err(err) => match self.symbols_cache_file_path() {
270 Some(cache_file_path) => match load_file::<Vec<SymbolMetadata>>(&cache_file_path) {
271 Ok(Some(data)) => {
272 info!(?err, "failed to fetch initial symbols from history service, but fetched last known symbols from cache");
273 Ok(data)
274 }
275 Ok(None) => Err(err),
276 Err(cache_err) => {
277 warn!(?cache_err, "failed to fetch data from cache");
278 Err(err)
279 }
280 },
281 None => Err(err),
282 },
283 }
284 }
285
286 async fn fetch_symbols(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
287 if self.config.urls.is_empty() {
288 bail!("no history urls provided");
289 }
290 let mut futures = self
291 .config
292 .urls
293 .iter()
294 .map(|url| Box::pin(self.fetch_symbols_single(url)))
295 .collect::<FuturesUnordered<_>>();
296 while let Some(result) = futures.next().await {
297 match result {
298 Ok(output) => return Ok(output),
299 Err(err) => {
300 warn!("failed to fetch symbols: {:?}", err);
301 }
302 }
303 }
304
305 bail!(
306 "failed to fetch symbols from any urls ({:?})",
307 self.config.urls
308 );
309 }
310
311 async fn fetch_symbols_single(&self, url: &Url) -> anyhow::Result<Vec<SymbolMetadata>> {
312 let url = url.join("v1/symbols")?;
313 retry_notify(
314 ExponentialBackoff::<SystemClock> {
315 max_elapsed_time: Some(self.config.update_interval),
318 ..Default::default()
319 },
320 || async {
321 let response = self
322 .client
323 .get(url.clone())
324 .send()
325 .await
326 .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))?
327 .backoff_error_for_status()?;
328 response
329 .json::<Vec<SymbolMetadata>>()
330 .await
331 .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))
332 },
333 |e, _| warn!("failed to fetch symbols from {} (will retry): {:?}", url, e),
334 )
335 .await
336 }
337}
338
339#[derive(Debug, Clone)]
340pub struct SymbolMetadataHandle(Arc<ArcSwap<HashMap<PriceFeedId, SymbolMetadata>>>);
341
342impl SymbolMetadataHandle {
343 pub fn symbols(&self) -> arc_swap::Guard<Arc<HashMap<PriceFeedId, SymbolMetadata>>> {
344 self.0.load()
345 }
346
347 pub fn new_for_test(data: HashMap<PriceFeedId, SymbolMetadata>) -> Self {
348 Self(Arc::new(ArcSwap::new(Arc::new(data))))
349 }
350}
351
352trait BackoffErrorForStatusExt: Sized {
353 fn backoff_error_for_status(self) -> Result<Self, backoff::Error<anyhow::Error>>;
354}
355
356impl BackoffErrorForStatusExt for reqwest::Response {
357 fn backoff_error_for_status(self) -> Result<Self, backoff::Error<anyhow::Error>> {
358 let status = self.status();
359 self.error_for_status().map_err(|err| {
360 if status.is_server_error() {
361 backoff::Error::transient(err.into())
362 } else {
363 backoff::Error::permanent(err.into())
364 }
365 })
366 }
367}
368
369fn load_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<Option<T>> {
370 let parent_path = path.parent().context("invalid file path: no parent")?;
371 fs_err::create_dir_all(parent_path)?;
372
373 if !path.try_exists()? {
374 return Ok(None);
375 }
376 let json_data = fs_err::read_to_string(path)?;
377 let data = serde_json::from_str::<T>(&json_data)?;
378 Ok(Some(data))
379}
380
381fn atomic_save_file(path: &Path, data: &impl Serialize) -> anyhow::Result<()> {
382 let parent_path = path.parent().context("invalid file path: no parent")?;
383 fs_err::create_dir_all(parent_path)?;
384
385 let json_data = serde_json::to_string(&data)?;
386 let tmp_path = path.with_extension("tmp");
387 let mut tmp_file = fs_err::File::create(&tmp_path)?;
388 tmp_file.write_all(json_data.as_bytes())?;
389 tmp_file.flush()?;
390 tmp_file.sync_all()?;
391 replace_atomic(&tmp_path, path)?;
392
393 Ok(())
394}