1use std::{
2 collections::HashMap,
3 io::Write,
4 path::{Path, PathBuf},
5 sync::{Arc, Weak},
6 time::Duration,
7};
8
9use anyhow::{bail, Context as _};
10use arc_swap::ArcSwap;
11use atomicwrites::replace_atomic;
12use backoff::{exponential::ExponentialBackoff, future::retry_notify, SystemClock};
13use futures::{stream::FuturesUnordered, StreamExt};
14use pyth_lazer_protocol::{jrpc::SymbolMetadata, PriceFeedId};
15use serde::{de::DeserializeOwned, Deserialize, Serialize};
16use tokio::{sync::mpsc, time::sleep};
17use tracing::{info, warn};
18use url::Url;
19
20const DEFAULT_URLS: &[&str] = &["https://history.pyth-lazer.dourolabs.app/"];
21const DEFAULT_UPDATE_INTERVAL: Duration = Duration::from_secs(30);
22const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
23
24#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
26pub struct PythLazerHistoryClientConfig {
27 #[serde(default = "default_urls")]
29 pub urls: Vec<Url>,
30 #[serde(with = "humantime_serde", default = "default_update_interval")]
33 pub update_interval: Duration,
34 #[serde(with = "humantime_serde", default = "default_request_timeout")]
36 pub request_timeout: Duration,
37 pub cache_dir: Option<PathBuf>,
39 #[serde(default = "default_channel_capacity")]
41 pub channel_capacity: usize,
42}
43
44fn default_urls() -> Vec<Url> {
45 DEFAULT_URLS
46 .iter()
47 .map(|url| Url::parse(url).unwrap())
48 .collect()
49}
50
51fn default_update_interval() -> Duration {
52 Duration::from_secs(30)
53}
54
55fn default_request_timeout() -> Duration {
56 Duration::from_secs(15)
57}
58
59fn default_channel_capacity() -> usize {
60 1000
61}
62
63impl Default for PythLazerHistoryClientConfig {
64 fn default() -> Self {
65 Self {
66 urls: default_urls(),
67 update_interval: default_update_interval(),
68 request_timeout: default_request_timeout(),
69 cache_dir: None,
70 channel_capacity: default_channel_capacity(),
71 }
72 }
73}
74
75#[derive(Debug, Clone)]
77pub struct PythLazerHistoryClient {
78 config: Arc<PythLazerHistoryClientConfig>,
79 client: reqwest::Client,
80}
81
82impl PythLazerHistoryClient {
83 pub fn new(config: PythLazerHistoryClientConfig) -> Self {
84 Self {
85 config: Arc::new(config),
86 client: reqwest::Client::builder()
87 .timeout(DEFAULT_REQUEST_TIMEOUT)
88 .build()
89 .expect("failed to initialize reqwest"),
90 }
91 }
92
93 fn symbols_cache_file_path(&self) -> Option<PathBuf> {
94 self.config
95 .cache_dir
96 .as_ref()
97 .map(|path| path.join("symbols_v1.json"))
98 }
99
100 pub async fn all_symbols_metadata(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
102 self.fetch_symbols_initial().await
103 }
104
105 pub async fn all_symbols_metadata_handle(&self) -> anyhow::Result<SymbolMetadataHandle> {
110 let symbols = Arc::new(
111 self.fetch_symbols_initial()
112 .await?
113 .into_iter()
114 .map(|f| (f.pyth_lazer_id, f))
115 .collect::<HashMap<_, _>>(),
116 );
117 let previous_symbols = symbols.clone();
118 let handle = Arc::new(ArcSwap::new(symbols));
119 let client = self.clone();
120 let weak_handle = Arc::downgrade(&handle);
121 tokio::spawn(async move {
122 client
123 .update_symbols_handle(weak_handle, previous_symbols)
124 .await;
125 });
126 Ok(SymbolMetadataHandle(handle))
127 }
128
129 pub async fn all_symbols_metadata_fault_tolerant_handle(&self) -> SymbolMetadataHandle {
134 let initial_result = self.fetch_symbols_initial().await;
135 let symbols = match initial_result {
136 Ok(data) => data
137 .into_iter()
138 .map(|f| (f.pyth_lazer_id, f))
139 .collect::<HashMap<_, _>>(),
140 Err(err) => {
141 warn!(
142 ?err,
143 "failed to fetch symbols, proceeding with empty symbol list"
144 );
145 HashMap::new()
146 }
147 };
148 let symbols = Arc::new(symbols);
149 let previous_symbols = symbols.clone();
150 let handle = Arc::new(ArcSwap::new(symbols));
151 let weak_handle = Arc::downgrade(&handle);
152 let client = self.clone();
153 tokio::spawn(async move {
154 client
155 .update_symbols_handle(weak_handle, previous_symbols)
156 .await;
157 });
158 SymbolMetadataHandle(handle)
159 }
160
161 pub async fn all_symbols_metadata_stream(
168 &self,
169 ) -> anyhow::Result<mpsc::Receiver<Vec<SymbolMetadata>>> {
170 if self.config.channel_capacity == 0 {
171 bail!("channel_capacity cannot be 0");
172 }
173 let symbols = self.fetch_symbols_initial().await?;
174 let (sender, receiver) = mpsc::channel(self.config.channel_capacity);
175
176 let previous_symbols = symbols.clone();
177 sender
178 .send(symbols)
179 .await
180 .expect("send to new channel failed");
181 let client = self.clone();
182 tokio::spawn(async move {
183 client.update_symbols_stream(sender, previous_symbols).await;
184 });
185 Ok(receiver)
186 }
187
188 async fn update_symbols_handle(
189 &self,
190 handle: Weak<ArcSwap<HashMap<PriceFeedId, SymbolMetadata>>>,
191 mut previous_symbols: Arc<HashMap<PriceFeedId, SymbolMetadata>>,
192 ) {
193 info!("starting background task for updating symbols");
194 loop {
195 sleep(DEFAULT_UPDATE_INTERVAL).await;
196 if handle.upgrade().is_none() {
197 info!("symbols handle dropped, stopping background task");
198 return;
199 }
200 match self.fetch_symbols().await {
201 Ok(new_symbols) => {
202 let new_symbols = new_symbols
203 .into_iter()
204 .map(|f| (f.pyth_lazer_id, f))
205 .collect::<HashMap<_, _>>();
206 if *previous_symbols != new_symbols {
207 let Some(handle) = handle.upgrade() else {
208 info!("symbols handle dropped, stopping background task");
209 return;
210 };
211 info!("symbols changed");
212 if let Some(cache_file_path) = self.symbols_cache_file_path() {
213 if let Err(err) = atomic_save_file(&cache_file_path, &new_symbols) {
214 warn!(?err, ?cache_file_path, "failed to save data to cache file");
215 }
216 }
217 let new_symbols = Arc::new(new_symbols);
218 previous_symbols = new_symbols.clone();
219 handle.store(new_symbols);
220 }
221 }
222 Err(err) => {
223 warn!(?err, "failed to fetch symbols");
224 }
225 }
226 }
227 }
228
229 async fn update_symbols_stream(
230 &self,
231 handle: mpsc::Sender<Vec<SymbolMetadata>>,
232 mut previous_symbols: Vec<SymbolMetadata>,
233 ) {
234 info!("starting background task for updating symbols");
235 loop {
236 sleep(DEFAULT_UPDATE_INTERVAL).await;
237 if handle.is_closed() {
238 info!("symbols channel closed, stopping background task");
239 return;
240 }
241 match self.fetch_symbols().await {
242 Ok(new_symbols) => {
243 if *previous_symbols != new_symbols {
244 info!("symbols changed");
245 if let Some(cache_file_path) = self.symbols_cache_file_path() {
246 if let Err(err) = atomic_save_file(&cache_file_path, &new_symbols) {
247 warn!(?err, ?cache_file_path, "failed to save data to cache file");
248 }
249 }
250 previous_symbols = new_symbols.clone();
251 if handle.send(new_symbols).await.is_err() {
252 info!("symbols channel closed, stopping background task");
253 return;
254 }
255 }
256 }
257 Err(err) => {
258 warn!(?err, "failed to fetch symbols");
259 }
260 }
261 }
262 }
263
264 async fn fetch_symbols_initial(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
265 let result = self.fetch_symbols().await;
266 match result {
267 Ok(data) => {
268 info!("fetched initial symbols from history service");
269 if let Some(cache_file_path) = self.symbols_cache_file_path() {
270 if let Err(err) = atomic_save_file(&cache_file_path, &data) {
271 warn!(?err, ?cache_file_path, "failed to save data to cache file");
272 }
273 }
274 Ok(data)
275 }
276 Err(err) => match self.symbols_cache_file_path() {
277 Some(cache_file_path) => match load_file::<Vec<SymbolMetadata>>(&cache_file_path) {
278 Ok(Some(data)) => {
279 info!(?err, "failed to fetch initial symbols from history service, but fetched last known symbols from cache");
280 Ok(data)
281 }
282 Ok(None) => Err(err),
283 Err(cache_err) => {
284 warn!(?cache_err, "failed to fetch data from cache");
285 Err(err)
286 }
287 },
288 None => Err(err),
289 },
290 }
291 }
292
293 async fn fetch_symbols(&self) -> anyhow::Result<Vec<SymbolMetadata>> {
294 if self.config.urls.is_empty() {
295 bail!("no history urls provided");
296 }
297 let mut futures = self
298 .config
299 .urls
300 .iter()
301 .map(|url| Box::pin(self.fetch_symbols_single(url)))
302 .collect::<FuturesUnordered<_>>();
303 while let Some(result) = futures.next().await {
304 match result {
305 Ok(output) => return Ok(output),
306 Err(err) => {
307 warn!("failed to fetch symbols: {:?}", err);
308 }
309 }
310 }
311
312 bail!(
313 "failed to fetch symbols from any urls ({:?})",
314 self.config.urls
315 );
316 }
317
318 async fn fetch_symbols_single(&self, url: &Url) -> anyhow::Result<Vec<SymbolMetadata>> {
319 let url = url.join("v1/symbols")?;
320 retry_notify(
321 ExponentialBackoff::<SystemClock> {
322 max_elapsed_time: Some(self.config.update_interval),
325 ..Default::default()
326 },
327 || async {
328 let response = self
329 .client
330 .get(url.clone())
331 .send()
332 .await
333 .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))?
334 .backoff_error_for_status()?;
335 response
336 .json::<Vec<SymbolMetadata>>()
337 .await
338 .map_err(|err| backoff::Error::transient(anyhow::Error::from(err)))
339 },
340 |e, _| warn!("failed to fetch symbols from {} (will retry): {:?}", url, e),
341 )
342 .await
343 }
344}
345
346#[derive(Debug, Clone)]
347pub struct SymbolMetadataHandle(Arc<ArcSwap<HashMap<PriceFeedId, SymbolMetadata>>>);
348
349impl SymbolMetadataHandle {
350 pub fn symbols(&self) -> arc_swap::Guard<Arc<HashMap<PriceFeedId, SymbolMetadata>>> {
351 self.0.load()
352 }
353
354 pub fn new_for_test(data: HashMap<PriceFeedId, SymbolMetadata>) -> Self {
355 Self(Arc::new(ArcSwap::new(Arc::new(data))))
356 }
357}
358
359trait BackoffErrorForStatusExt: Sized {
360 fn backoff_error_for_status(self) -> Result<Self, backoff::Error<anyhow::Error>>;
361}
362
363impl BackoffErrorForStatusExt for reqwest::Response {
364 fn backoff_error_for_status(self) -> Result<Self, backoff::Error<anyhow::Error>> {
365 let status = self.status();
366 self.error_for_status().map_err(|err| {
367 if status.is_server_error() {
368 backoff::Error::transient(err.into())
369 } else {
370 backoff::Error::permanent(err.into())
371 }
372 })
373 }
374}
375
376fn load_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<Option<T>> {
377 let parent_path = path.parent().context("invalid file path: no parent")?;
378 fs_err::create_dir_all(parent_path)?;
379
380 if !path.try_exists()? {
381 return Ok(None);
382 }
383 let json_data = fs_err::read_to_string(path)?;
384 let data = serde_json::from_str::<T>(&json_data)?;
385 Ok(Some(data))
386}
387
388fn atomic_save_file(path: &Path, data: &impl Serialize) -> anyhow::Result<()> {
389 let parent_path = path.parent().context("invalid file path: no parent")?;
390 fs_err::create_dir_all(parent_path)?;
391
392 let json_data = serde_json::to_string(&data)?;
393 let tmp_path = path.with_extension("tmp");
394 let mut tmp_file = fs_err::File::create(&tmp_path)?;
395 tmp_file.write_all(json_data.as_bytes())?;
396 tmp_file.flush()?;
397 tmp_file.sync_all()?;
398 replace_atomic(&tmp_path, path)?;
399
400 Ok(())
401}