1use std::{
7 collections::HashMap,
8 sync::Arc,
9 time::{Duration, SystemTime},
10};
11
12use async_trait::async_trait;
13use backoff::{exponential::ExponentialBackoffBuilder, ExponentialBackoff};
14use futures03::future::try_join_all;
15#[cfg(test)]
16use mockall::automock;
17use reqwest::{header, Client, ClientBuilder, Response, StatusCode, Url};
18use serde::Serialize;
19use thiserror::Error;
20use time::{format_description::well_known::Rfc2822, OffsetDateTime};
21use tokio::{
22 sync::{RwLock, Semaphore},
23 time::sleep,
24};
25use tracing::{debug, error, instrument, trace, warn};
26use tycho_common::{
27 dto::{
28 BlockParam, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
29 EntryPointWithTracingParams, PaginationParams, PaginationResponse, ProtocolComponent,
30 ProtocolComponentRequestResponse, ProtocolComponentsRequestBody, ProtocolStateRequestBody,
31 ProtocolStateRequestResponse, ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse,
32 ResponseToken, StateRequestBody, StateRequestResponse, TokensRequestBody,
33 TokensRequestResponse, TracedEntryPointRequestBody, TracedEntryPointRequestResponse,
34 TracingResult, VersionParam,
35 },
36 models::ComponentId,
37 Bytes,
38};
39
40use crate::{
41 feed::synchronizer::{ComponentWithState, Snapshot},
42 TYCHO_SERVER_VERSION,
43};
44
45#[derive(Clone, Debug, PartialEq)]
50pub struct SnapshotParameters<'a> {
51 pub chain: Chain,
53 pub protocol_system: &'a str,
55 pub components: &'a HashMap<ComponentId, ProtocolComponent>,
57 pub entrypoints: Option<&'a HashMap<String, Vec<(EntryPointWithTracingParams, TracingResult)>>>,
59 pub contract_ids: &'a [Bytes],
61 pub block_number: u64,
63 pub include_balances: bool,
65 pub include_tvl: bool,
67}
68
69impl<'a> SnapshotParameters<'a> {
70 pub fn new(
71 chain: Chain,
72 protocol_system: &'a str,
73 components: &'a HashMap<ComponentId, ProtocolComponent>,
74 contract_ids: &'a [Bytes],
75 block_number: u64,
76 ) -> Self {
77 Self {
78 chain,
79 protocol_system,
80 components,
81 entrypoints: None,
82 contract_ids,
83 block_number,
84 include_balances: true,
85 include_tvl: true,
86 }
87 }
88
89 pub fn include_balances(mut self, include_balances: bool) -> Self {
91 self.include_balances = include_balances;
92 self
93 }
94
95 pub fn include_tvl(mut self, include_tvl: bool) -> Self {
97 self.include_tvl = include_tvl;
98 self
99 }
100
101 pub fn entrypoints(
102 mut self,
103 entrypoints: &'a HashMap<String, Vec<(EntryPointWithTracingParams, TracingResult)>>,
104 ) -> Self {
105 self.entrypoints = Some(entrypoints);
106 self
107 }
108}
109
110#[derive(Error, Debug)]
111pub enum RPCError {
112 #[error("Failed to parse URL: {0}. Error: {1}")]
114 UrlParsing(String, String),
115
116 #[error("Failed to format request: {0}")]
118 FormatRequest(String),
119
120 #[error("Unexpected HTTP client error: {0}")]
122 HttpClient(String, #[source] reqwest::Error),
123
124 #[error("Failed to parse response: {0}")]
126 ParseResponse(String),
127
128 #[error("Fatal error: {0}")]
130 Fatal(String),
131
132 #[error("Rate limited until {0:?}")]
133 RateLimited(Option<SystemTime>),
134
135 #[error("Server unreachable: {0}")]
136 ServerUnreachable(String),
137}
138
139#[cfg_attr(test, automock)]
140#[async_trait]
141pub trait RPCClient: Send + Sync {
142 async fn get_contract_state(
144 &self,
145 request: &StateRequestBody,
146 ) -> Result<StateRequestResponse, RPCError>;
147
148 async fn get_contract_state_paginated(
149 &self,
150 chain: Chain,
151 ids: &[Bytes],
152 protocol_system: &str,
153 version: &VersionParam,
154 chunk_size: usize,
155 concurrency: usize,
156 ) -> Result<StateRequestResponse, RPCError> {
157 let semaphore = Arc::new(Semaphore::new(concurrency));
158
159 let mut sorted_ids = ids.to_vec();
161 sorted_ids.sort();
162
163 let chunked_bodies = sorted_ids
164 .chunks(chunk_size)
165 .map(|chunk| StateRequestBody {
166 contract_ids: Some(chunk.to_vec()),
167 protocol_system: protocol_system.to_string(),
168 chain,
169 version: version.clone(),
170 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
171 })
172 .collect::<Vec<_>>();
173
174 let mut tasks = Vec::new();
175 for body in chunked_bodies.iter() {
176 let sem = semaphore.clone();
177 tasks.push(async move {
178 let _permit = sem
179 .acquire()
180 .await
181 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
182 self.get_contract_state(body).await
183 });
184 }
185
186 let responses = try_join_all(tasks).await?;
188
189 let accounts = responses
191 .iter()
192 .flat_map(|r| r.accounts.clone())
193 .collect();
194 let total: i64 = responses
195 .iter()
196 .map(|r| r.pagination.total)
197 .sum();
198
199 Ok(StateRequestResponse {
200 accounts,
201 pagination: PaginationResponse { page: 0, page_size: chunk_size as i64, total },
202 })
203 }
204
205 async fn get_protocol_components(
206 &self,
207 request: &ProtocolComponentsRequestBody,
208 ) -> Result<ProtocolComponentRequestResponse, RPCError>;
209
210 async fn get_protocol_components_paginated(
211 &self,
212 request: &ProtocolComponentsRequestBody,
213 chunk_size: usize,
214 concurrency: usize,
215 ) -> Result<ProtocolComponentRequestResponse, RPCError> {
216 let semaphore = Arc::new(Semaphore::new(concurrency));
217
218 match request.component_ids {
221 Some(ref ids) => {
222 let chunked_bodies = ids
224 .chunks(chunk_size)
225 .enumerate()
226 .map(|(index, _)| ProtocolComponentsRequestBody {
227 protocol_system: request.protocol_system.clone(),
228 component_ids: request.component_ids.clone(),
229 tvl_gt: request.tvl_gt,
230 chain: request.chain,
231 pagination: PaginationParams {
232 page: index as i64,
233 page_size: chunk_size as i64,
234 },
235 })
236 .collect::<Vec<_>>();
237
238 let mut tasks = Vec::new();
239 for body in chunked_bodies.iter() {
240 let sem = semaphore.clone();
241 tasks.push(async move {
242 let _permit = sem
243 .acquire()
244 .await
245 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
246 self.get_protocol_components(body).await
247 });
248 }
249
250 try_join_all(tasks)
251 .await
252 .map(|responses| ProtocolComponentRequestResponse {
253 protocol_components: responses
254 .into_iter()
255 .flat_map(|r| r.protocol_components.into_iter())
256 .collect(),
257 pagination: PaginationResponse {
258 page: 0,
259 page_size: chunk_size as i64,
260 total: ids.len() as i64,
261 },
262 })
263 }
264 _ => {
265 let initial_request = ProtocolComponentsRequestBody {
269 protocol_system: request.protocol_system.clone(),
270 component_ids: request.component_ids.clone(),
271 tvl_gt: request.tvl_gt,
272 chain: request.chain,
273 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
274 };
275 let first_response = self
276 .get_protocol_components(&initial_request)
277 .await
278 .map_err(|err| RPCError::Fatal(err.to_string()))?;
279
280 let total_items = first_response.pagination.total;
281 let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
282
283 let mut accumulated_response = ProtocolComponentRequestResponse {
285 protocol_components: first_response.protocol_components,
286 pagination: PaginationResponse {
287 page: 0,
288 page_size: chunk_size as i64,
289 total: total_items,
290 },
291 };
292
293 let mut page = 1;
294 while page < total_pages {
295 let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
296
297 let chunked_bodies = (0..requests_in_this_iteration)
299 .map(|iter| ProtocolComponentsRequestBody {
300 protocol_system: request.protocol_system.clone(),
301 component_ids: request.component_ids.clone(),
302 tvl_gt: request.tvl_gt,
303 chain: request.chain,
304 pagination: PaginationParams {
305 page: page + iter,
306 page_size: chunk_size as i64,
307 },
308 })
309 .collect::<Vec<_>>();
310
311 let tasks: Vec<_> = chunked_bodies
312 .iter()
313 .map(|body| {
314 let sem = semaphore.clone();
315 async move {
316 let _permit = sem.acquire().await.map_err(|_| {
317 RPCError::Fatal("Semaphore dropped".to_string())
318 })?;
319 self.get_protocol_components(body).await
320 }
321 })
322 .collect();
323
324 let responses = try_join_all(tasks)
325 .await
326 .map(|responses| {
327 let total = responses[0].pagination.total;
328 ProtocolComponentRequestResponse {
329 protocol_components: responses
330 .into_iter()
331 .flat_map(|r| r.protocol_components.into_iter())
332 .collect(),
333 pagination: PaginationResponse {
334 page,
335 page_size: chunk_size as i64,
336 total,
337 },
338 }
339 });
340
341 match responses {
343 Ok(mut resp) => {
344 accumulated_response
345 .protocol_components
346 .append(&mut resp.protocol_components);
347 }
348 Err(e) => return Err(e),
349 }
350
351 page += concurrency as i64;
352 }
353 Ok(accumulated_response)
354 }
355 }
356 }
357
358 async fn get_protocol_states(
359 &self,
360 request: &ProtocolStateRequestBody,
361 ) -> Result<ProtocolStateRequestResponse, RPCError>;
362
363 #[allow(clippy::too_many_arguments)]
364 async fn get_protocol_states_paginated<T>(
365 &self,
366 chain: Chain,
367 ids: &[T],
368 protocol_system: &str,
369 include_balances: bool,
370 version: &VersionParam,
371 chunk_size: usize,
372 concurrency: usize,
373 ) -> Result<ProtocolStateRequestResponse, RPCError>
374 where
375 T: AsRef<str> + Sync + 'static,
376 {
377 let semaphore = Arc::new(Semaphore::new(concurrency));
378 let chunked_bodies = ids
379 .chunks(chunk_size)
380 .map(|c| ProtocolStateRequestBody {
381 protocol_ids: Some(
382 c.iter()
383 .map(|id| id.as_ref().to_string())
384 .collect(),
385 ),
386 protocol_system: protocol_system.to_string(),
387 chain,
388 include_balances,
389 version: version.clone(),
390 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
391 })
392 .collect::<Vec<_>>();
393
394 let mut tasks = Vec::new();
395 for body in chunked_bodies.iter() {
396 let sem = semaphore.clone();
397 tasks.push(async move {
398 let _permit = sem
399 .acquire()
400 .await
401 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
402 self.get_protocol_states(body).await
403 });
404 }
405
406 try_join_all(tasks)
407 .await
408 .map(|responses| {
409 let states = responses
410 .clone()
411 .into_iter()
412 .flat_map(|r| r.states)
413 .collect();
414 let total = responses
415 .iter()
416 .map(|r| r.pagination.total)
417 .sum();
418 ProtocolStateRequestResponse {
419 states,
420 pagination: PaginationResponse { page: 0, page_size: chunk_size as i64, total },
421 }
422 })
423 }
424
425 async fn get_tokens(
428 &self,
429 request: &TokensRequestBody,
430 ) -> Result<TokensRequestResponse, RPCError>;
431
432 async fn get_all_tokens(
433 &self,
434 chain: Chain,
435 min_quality: Option<i32>,
436 traded_n_days_ago: Option<u64>,
437 chunk_size: usize,
438 ) -> Result<Vec<ResponseToken>, RPCError> {
439 let mut request_page = 0;
440 let mut all_tokens = Vec::new();
441 loop {
442 let mut response = self
443 .get_tokens(&TokensRequestBody {
444 token_addresses: None,
445 min_quality,
446 traded_n_days_ago,
447 pagination: PaginationParams {
448 page: request_page,
449 page_size: chunk_size.try_into().map_err(|_| {
450 RPCError::FormatRequest(
451 "Failed to convert chunk_size into i64".to_string(),
452 )
453 })?,
454 },
455 chain,
456 })
457 .await?;
458
459 let num_tokens = response.tokens.len();
460 all_tokens.append(&mut response.tokens);
461 request_page += 1;
462
463 if num_tokens < chunk_size {
464 break;
465 }
466 }
467 Ok(all_tokens)
468 }
469
470 async fn get_protocol_systems(
471 &self,
472 request: &ProtocolSystemsRequestBody,
473 ) -> Result<ProtocolSystemsRequestResponse, RPCError>;
474
475 async fn get_component_tvl(
476 &self,
477 request: &ComponentTvlRequestBody,
478 ) -> Result<ComponentTvlRequestResponse, RPCError>;
479
480 async fn get_component_tvl_paginated(
481 &self,
482 request: &ComponentTvlRequestBody,
483 chunk_size: usize,
484 concurrency: usize,
485 ) -> Result<ComponentTvlRequestResponse, RPCError> {
486 let semaphore = Arc::new(Semaphore::new(concurrency));
487
488 match request.component_ids {
489 Some(ref ids) => {
490 let chunked_requests = ids
491 .chunks(chunk_size)
492 .enumerate()
493 .map(|(index, _)| ComponentTvlRequestBody {
494 chain: request.chain,
495 protocol_system: request.protocol_system.clone(),
496 component_ids: Some(ids.clone()),
497 pagination: PaginationParams {
498 page: index as i64,
499 page_size: chunk_size as i64,
500 },
501 })
502 .collect::<Vec<_>>();
503
504 let tasks: Vec<_> = chunked_requests
505 .into_iter()
506 .map(|req| {
507 let sem = semaphore.clone();
508 async move {
509 let _permit = sem
510 .acquire()
511 .await
512 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
513 self.get_component_tvl(&req).await
514 }
515 })
516 .collect();
517
518 let responses = try_join_all(tasks).await?;
519
520 let mut merged_tvl = HashMap::new();
521 for resp in responses {
522 for (key, value) in resp.tvl {
523 *merged_tvl.entry(key).or_insert(0.0) = value;
524 }
525 }
526
527 Ok(ComponentTvlRequestResponse {
528 tvl: merged_tvl,
529 pagination: PaginationResponse {
530 page: 0,
531 page_size: chunk_size as i64,
532 total: ids.len() as i64,
533 },
534 })
535 }
536 _ => {
537 let first_request = ComponentTvlRequestBody {
538 chain: request.chain,
539 protocol_system: request.protocol_system.clone(),
540 component_ids: request.component_ids.clone(),
541 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
542 };
543
544 let first_response = self
545 .get_component_tvl(&first_request)
546 .await?;
547 let total_items = first_response.pagination.total;
548 let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
549
550 let mut merged_tvl = first_response.tvl;
551
552 let mut page = 1;
553 while page < total_pages {
554 let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
555
556 let chunked_requests: Vec<_> = (0..requests_in_this_iteration)
557 .map(|i| ComponentTvlRequestBody {
558 chain: request.chain,
559 protocol_system: request.protocol_system.clone(),
560 component_ids: request.component_ids.clone(),
561 pagination: PaginationParams {
562 page: page + i,
563 page_size: chunk_size as i64,
564 },
565 })
566 .collect();
567
568 let tasks: Vec<_> = chunked_requests
569 .into_iter()
570 .map(|req| {
571 let sem = semaphore.clone();
572 async move {
573 let _permit = sem.acquire().await.map_err(|_| {
574 RPCError::Fatal("Semaphore dropped".to_string())
575 })?;
576 self.get_component_tvl(&req).await
577 }
578 })
579 .collect();
580
581 let responses = try_join_all(tasks).await?;
582
583 for resp in responses {
585 for (key, value) in resp.tvl {
586 *merged_tvl.entry(key).or_insert(0.0) += value;
587 }
588 }
589
590 page += concurrency as i64;
591 }
592
593 Ok(ComponentTvlRequestResponse {
594 tvl: merged_tvl,
595 pagination: PaginationResponse {
596 page: 0,
597 page_size: chunk_size as i64,
598 total: total_items,
599 },
600 })
601 }
602 }
603 }
604
605 async fn get_traced_entry_points(
606 &self,
607 request: &TracedEntryPointRequestBody,
608 ) -> Result<TracedEntryPointRequestResponse, RPCError>;
609
610 async fn get_traced_entry_points_paginated(
611 &self,
612 chain: Chain,
613 protocol_system: &str,
614 component_ids: &[String],
615 chunk_size: usize,
616 concurrency: usize,
617 ) -> Result<TracedEntryPointRequestResponse, RPCError> {
618 let semaphore = Arc::new(Semaphore::new(concurrency));
619 let chunked_bodies = component_ids
620 .chunks(chunk_size)
621 .map(|c| TracedEntryPointRequestBody {
622 chain,
623 protocol_system: protocol_system.to_string(),
624 component_ids: Some(c.to_vec()),
625 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
626 })
627 .collect::<Vec<_>>();
628
629 let mut tasks = Vec::new();
630 for body in chunked_bodies.iter() {
631 let sem = semaphore.clone();
632 tasks.push(async move {
633 let _permit = sem
634 .acquire()
635 .await
636 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
637 self.get_traced_entry_points(body).await
638 });
639 }
640
641 try_join_all(tasks)
642 .await
643 .map(|responses| {
644 let traced_entry_points = responses
645 .clone()
646 .into_iter()
647 .flat_map(|r| r.traced_entry_points)
648 .collect();
649 let total = responses
650 .iter()
651 .map(|r| r.pagination.total)
652 .sum();
653 TracedEntryPointRequestResponse {
654 traced_entry_points,
655 pagination: PaginationResponse { page: 0, page_size: chunk_size as i64, total },
656 }
657 })
658 }
659
660 async fn get_snapshots<'a>(
661 &self,
662 request: &SnapshotParameters<'a>,
663 chunk_size: usize,
664 concurrency: usize,
665 ) -> Result<Snapshot, RPCError>;
666}
667
668#[derive(Debug, Clone)]
669pub struct HttpRPCClient {
670 http_client: Client,
671 url: Url,
672 retry_after: Arc<RwLock<Option<SystemTime>>>,
673 backoff_policy: ExponentialBackoff,
674 server_restart_duration: Duration,
675}
676
677impl HttpRPCClient {
678 pub fn new(base_uri: &str, auth_key: Option<&str>) -> Result<Self, RPCError> {
679 let uri = base_uri
680 .parse::<Url>()
681 .map_err(|e| RPCError::UrlParsing(base_uri.to_string(), e.to_string()))?;
682
683 let mut headers = header::HeaderMap::new();
685 headers.insert(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"));
686 let user_agent = format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION"));
687 headers.insert(
688 header::USER_AGENT,
689 header::HeaderValue::from_str(&user_agent)
690 .map_err(|e| RPCError::FormatRequest(format!("Invalid user agent format: {e}")))?,
691 );
692
693 if let Some(key) = auth_key {
695 let mut auth_value = header::HeaderValue::from_str(key).map_err(|e| {
696 RPCError::FormatRequest(format!("Invalid authorization key format: {e}"))
697 })?;
698 auth_value.set_sensitive(true);
699 headers.insert(header::AUTHORIZATION, auth_value);
700 }
701
702 let client = ClientBuilder::new()
703 .default_headers(headers)
704 .http2_prior_knowledge()
705 .build()
706 .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
707 Ok(Self {
708 http_client: client,
709 url: uri,
710 retry_after: Arc::new(RwLock::new(None)),
711 backoff_policy: ExponentialBackoffBuilder::new()
712 .with_initial_interval(Duration::from_millis(250))
713 .with_multiplier(1.75)
715 .with_max_interval(Duration::from_secs(30))
717 .with_max_elapsed_time(Some(Duration::from_secs(125)))
719 .build(),
720 server_restart_duration: Duration::from_secs(120),
721 })
722 }
723
724 #[cfg(test)]
725 pub fn with_test_backoff_policy(mut self) -> Self {
726 self.backoff_policy = ExponentialBackoffBuilder::new()
728 .with_initial_interval(Duration::from_millis(1))
729 .with_multiplier(1.1)
730 .with_max_interval(Duration::from_millis(5))
731 .with_max_elapsed_time(Some(Duration::from_millis(50)))
732 .build();
733 self.server_restart_duration = Duration::from_millis(50);
734 self
735 }
736
737 async fn error_for_response(
743 &self,
744 response: reqwest::Response,
745 ) -> Result<reqwest::Response, RPCError> {
746 match response.status() {
747 StatusCode::TOO_MANY_REQUESTS => {
748 let retry_after_raw = response
749 .headers()
750 .get(reqwest::header::RETRY_AFTER)
751 .and_then(|h| h.to_str().ok())
752 .and_then(parse_retry_value);
753
754 Err(RPCError::RateLimited(retry_after_raw))
755 }
756 StatusCode::BAD_GATEWAY |
757 StatusCode::SERVICE_UNAVAILABLE |
758 StatusCode::GATEWAY_TIMEOUT => Err(RPCError::ServerUnreachable(
759 response
760 .text()
761 .await
762 .unwrap_or_else(|_| "Server Unreachable".to_string()),
763 )),
764 _ => Ok(response),
765 }
766 }
767
768 async fn handle_error_for_backoff(&self, e: RPCError) -> backoff::Error<RPCError> {
774 match e {
775 RPCError::ServerUnreachable(_) => {
776 backoff::Error::retry_after(e, self.server_restart_duration)
777 }
778 RPCError::RateLimited(Some(until)) => {
779 let mut retry_after_guard = self.retry_after.write().await;
780 *retry_after_guard = Some(
781 retry_after_guard
782 .unwrap_or(until)
783 .max(until),
784 );
785
786 if let Ok(duration) = until.duration_since(SystemTime::now()) {
787 backoff::Error::retry_after(e, duration)
788 } else {
789 e.into()
790 }
791 }
792 RPCError::RateLimited(None) => e.into(),
793 _ => backoff::Error::permanent(e),
794 }
795 }
796
797 async fn wait_until_retry_after(&self) {
802 if let Some(&until) = self.retry_after.read().await.as_ref() {
803 let now = SystemTime::now();
804 if until > now {
805 if let Ok(duration) = until.duration_since(now) {
806 sleep(duration).await
807 }
808 }
809 }
810 }
811
812 async fn make_post_request<T: Serialize + ?Sized>(
817 &self,
818 request: &T,
819 uri: &String,
820 ) -> Result<Response, RPCError> {
821 self.wait_until_retry_after().await;
822 let response = backoff::future::retry(self.backoff_policy.clone(), || async {
823 let server_response = self
824 .http_client
825 .post(uri)
826 .json(request)
827 .send()
828 .await
829 .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
830
831 match self
832 .error_for_response(server_response)
833 .await
834 {
835 Ok(response) => Ok(response),
836 Err(e) => Err(self.handle_error_for_backoff(e).await),
837 }
838 })
839 .await?;
840 Ok(response)
841 }
842}
843
844fn parse_retry_value(val: &str) -> Option<SystemTime> {
845 if let Ok(secs) = val.parse::<u64>() {
846 return Some(SystemTime::now() + Duration::from_secs(secs));
847 }
848 if let Ok(date) = OffsetDateTime::parse(val, &Rfc2822) {
849 return Some(date.into());
850 }
851 None
852}
853
854#[async_trait]
855impl RPCClient for HttpRPCClient {
856 #[instrument(skip(self, request))]
857 async fn get_contract_state(
858 &self,
859 request: &StateRequestBody,
860 ) -> Result<StateRequestResponse, RPCError> {
861 if request
863 .contract_ids
864 .as_ref()
865 .is_none_or(|ids| ids.is_empty())
866 {
867 warn!("No contract ids specified in request.");
868 }
869
870 let uri = format!(
871 "{}/{}/contract_state",
872 self.url
873 .to_string()
874 .trim_end_matches('/'),
875 TYCHO_SERVER_VERSION
876 );
877 debug!(%uri, "Sending contract_state request to Tycho server");
878 trace!(?request, "Sending request to Tycho server");
879 let response = self
880 .make_post_request(request, &uri)
881 .await?;
882 trace!(?response, "Received response from Tycho server");
883
884 let body = response
885 .text()
886 .await
887 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
888 if body.is_empty() {
889 return Ok(StateRequestResponse {
891 accounts: vec![],
892 pagination: PaginationResponse {
893 page: request.pagination.page,
894 page_size: request.pagination.page,
895 total: 0,
896 },
897 });
898 }
899
900 let accounts = serde_json::from_str::<StateRequestResponse>(&body)
901 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
902 trace!(?accounts, "Received contract_state response from Tycho server");
903
904 Ok(accounts)
905 }
906
907 async fn get_protocol_components(
908 &self,
909 request: &ProtocolComponentsRequestBody,
910 ) -> Result<ProtocolComponentRequestResponse, RPCError> {
911 let uri = format!(
912 "{}/{}/protocol_components",
913 self.url
914 .to_string()
915 .trim_end_matches('/'),
916 TYCHO_SERVER_VERSION,
917 );
918 debug!(%uri, "Sending protocol_components request to Tycho server");
919 trace!(?request, "Sending request to Tycho server");
920
921 let response = self
922 .make_post_request(request, &uri)
923 .await?;
924
925 trace!(?response, "Received response from Tycho server");
926
927 let body = response
928 .text()
929 .await
930 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
931 let components = serde_json::from_str::<ProtocolComponentRequestResponse>(&body)
932 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
933 trace!(?components, "Received protocol_components response from Tycho server");
934
935 Ok(components)
936 }
937
938 async fn get_protocol_states(
939 &self,
940 request: &ProtocolStateRequestBody,
941 ) -> Result<ProtocolStateRequestResponse, RPCError> {
942 if request
944 .protocol_ids
945 .as_ref()
946 .is_none_or(|ids| ids.is_empty())
947 {
948 warn!("No protocol ids specified in request.");
949 }
950
951 let uri = format!(
952 "{}/{}/protocol_state",
953 self.url
954 .to_string()
955 .trim_end_matches('/'),
956 TYCHO_SERVER_VERSION
957 );
958 debug!(%uri, "Sending protocol_states request to Tycho server");
959 trace!(?request, "Sending request to Tycho server");
960
961 let response = self
962 .make_post_request(request, &uri)
963 .await?;
964 trace!(?response, "Received response from Tycho server");
965
966 let body = response
967 .text()
968 .await
969 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
970
971 if body.is_empty() {
972 return Ok(ProtocolStateRequestResponse {
974 states: vec![],
975 pagination: PaginationResponse {
976 page: request.pagination.page,
977 page_size: request.pagination.page_size,
978 total: 0,
979 },
980 });
981 }
982
983 let states = serde_json::from_str::<ProtocolStateRequestResponse>(&body)
984 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
985 trace!(?states, "Received protocol_states response from Tycho server");
986
987 Ok(states)
988 }
989
990 async fn get_tokens(
991 &self,
992 request: &TokensRequestBody,
993 ) -> Result<TokensRequestResponse, RPCError> {
994 let uri = format!(
995 "{}/{}/tokens",
996 self.url
997 .to_string()
998 .trim_end_matches('/'),
999 TYCHO_SERVER_VERSION
1000 );
1001 debug!(%uri, "Sending tokens request to Tycho server");
1002
1003 let response = self
1004 .make_post_request(request, &uri)
1005 .await?;
1006
1007 let body = response
1008 .text()
1009 .await
1010 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1011 let tokens = serde_json::from_str::<TokensRequestResponse>(&body)
1012 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1013
1014 Ok(tokens)
1015 }
1016
1017 async fn get_protocol_systems(
1018 &self,
1019 request: &ProtocolSystemsRequestBody,
1020 ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
1021 let uri = format!(
1022 "{}/{}/protocol_systems",
1023 self.url
1024 .to_string()
1025 .trim_end_matches('/'),
1026 TYCHO_SERVER_VERSION
1027 );
1028 debug!(%uri, "Sending protocol_systems request to Tycho server");
1029 trace!(?request, "Sending request to Tycho server");
1030 let response = self
1031 .make_post_request(request, &uri)
1032 .await?;
1033 trace!(?response, "Received response from Tycho server");
1034 let body = response
1035 .text()
1036 .await
1037 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1038 let protocol_systems = serde_json::from_str::<ProtocolSystemsRequestResponse>(&body)
1039 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1040 trace!(?protocol_systems, "Received protocol_systems response from Tycho server");
1041 Ok(protocol_systems)
1042 }
1043
1044 async fn get_component_tvl(
1045 &self,
1046 request: &ComponentTvlRequestBody,
1047 ) -> Result<ComponentTvlRequestResponse, RPCError> {
1048 let uri = format!(
1049 "{}/{}/component_tvl",
1050 self.url
1051 .to_string()
1052 .trim_end_matches('/'),
1053 TYCHO_SERVER_VERSION
1054 );
1055 debug!(%uri, "Sending get_component_tvl request to Tycho server");
1056 trace!(?request, "Sending request to Tycho server");
1057 let response = self
1058 .make_post_request(request, &uri)
1059 .await?;
1060 trace!(?response, "Received response from Tycho server");
1061 let body = response
1062 .text()
1063 .await
1064 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1065 let component_tvl =
1066 serde_json::from_str::<ComponentTvlRequestResponse>(&body).map_err(|err| {
1067 error!("Failed to parse component_tvl response: {:?}", &body);
1068 RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1069 })?;
1070 trace!(?component_tvl, "Received component_tvl response from Tycho server");
1071 Ok(component_tvl)
1072 }
1073
1074 async fn get_traced_entry_points(
1075 &self,
1076 request: &TracedEntryPointRequestBody,
1077 ) -> Result<TracedEntryPointRequestResponse, RPCError> {
1078 let uri = format!(
1079 "{}/{TYCHO_SERVER_VERSION}/traced_entry_points",
1080 self.url
1081 .to_string()
1082 .trim_end_matches('/')
1083 );
1084 debug!(%uri, "Sending traced_entry_points request to Tycho server");
1085 trace!(?request, "Sending request to Tycho server");
1086
1087 let response = self
1088 .make_post_request(request, &uri)
1089 .await?;
1090
1091 trace!(?response, "Received response from Tycho server");
1092
1093 let body = response
1094 .text()
1095 .await
1096 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1097 let entrypoints =
1098 serde_json::from_str::<TracedEntryPointRequestResponse>(&body).map_err(|err| {
1099 error!("Failed to parse traced_entry_points response: {:?}", &body);
1100 RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1101 })?;
1102 trace!(?entrypoints, "Received traced_entry_points response from Tycho server");
1103 Ok(entrypoints)
1104 }
1105
1106 async fn get_snapshots<'a>(
1107 &self,
1108 request: &SnapshotParameters<'a>,
1109 chunk_size: usize,
1110 concurrency: usize,
1111 ) -> Result<Snapshot, RPCError> {
1112 let component_ids: Vec<_> = request
1113 .components
1114 .keys()
1115 .cloned()
1116 .collect();
1117
1118 let version = VersionParam::new(
1119 None,
1120 Some({
1121 #[allow(deprecated)]
1122 BlockParam {
1123 hash: None,
1124 chain: Some(request.chain),
1125 number: Some(request.block_number as i64),
1126 }
1127 }),
1128 );
1129
1130 let component_tvl = if request.include_tvl && !component_ids.is_empty() {
1131 let body = ComponentTvlRequestBody::id_filtered(component_ids.clone(), request.chain);
1132 self.get_component_tvl_paginated(&body, chunk_size, concurrency)
1133 .await?
1134 .tvl
1135 } else {
1136 HashMap::new()
1137 };
1138
1139 let mut protocol_states = if !component_ids.is_empty() {
1140 self.get_protocol_states_paginated(
1141 request.chain,
1142 &component_ids,
1143 request.protocol_system,
1144 request.include_balances,
1145 &version,
1146 chunk_size,
1147 concurrency,
1148 )
1149 .await?
1150 .states
1151 .into_iter()
1152 .map(|state| (state.component_id.clone(), state))
1153 .collect()
1154 } else {
1155 HashMap::new()
1156 };
1157
1158 let states = request
1160 .components
1161 .values()
1162 .filter_map(|component| {
1163 if let Some(state) = protocol_states.remove(&component.id) {
1164 Some((
1165 component.id.clone(),
1166 ComponentWithState {
1167 state,
1168 component: component.clone(),
1169 component_tvl: component_tvl
1170 .get(&component.id)
1171 .cloned(),
1172 entrypoints: request
1173 .entrypoints
1174 .as_ref()
1175 .and_then(|map| map.get(&component.id))
1176 .cloned()
1177 .unwrap_or_default(),
1178 },
1179 ))
1180 } else if component_ids.contains(&component.id) {
1181 let component_id = &component.id;
1183 error!(?component_id, "Missing state for native component!");
1184 None
1185 } else {
1186 None
1187 }
1188 })
1189 .collect();
1190
1191 let vm_storage = if !request.contract_ids.is_empty() {
1192 let contract_states = self
1193 .get_contract_state_paginated(
1194 request.chain,
1195 request.contract_ids,
1196 request.protocol_system,
1197 &version,
1198 chunk_size,
1199 concurrency,
1200 )
1201 .await?
1202 .accounts
1203 .into_iter()
1204 .map(|acc| (acc.address.clone(), acc))
1205 .collect::<HashMap<_, _>>();
1206
1207 trace!(states=?&contract_states, "Retrieved ContractState");
1208
1209 let contract_address_to_components = request
1210 .components
1211 .iter()
1212 .filter_map(|(id, comp)| {
1213 if component_ids.contains(id) {
1214 Some(
1215 comp.contract_ids
1216 .iter()
1217 .map(|address| (address.clone(), comp.id.clone())),
1218 )
1219 } else {
1220 None
1221 }
1222 })
1223 .flatten()
1224 .fold(HashMap::<Bytes, Vec<String>>::new(), |mut acc, (addr, c_id)| {
1225 acc.entry(addr).or_default().push(c_id);
1226 acc
1227 });
1228
1229 request
1230 .contract_ids
1231 .iter()
1232 .filter_map(|address| {
1233 if let Some(state) = contract_states.get(address) {
1234 Some((address.clone(), state.clone()))
1235 } else if let Some(ids) = contract_address_to_components.get(address) {
1236 error!(
1238 ?address,
1239 ?ids,
1240 "Component with lacking contract storage encountered!"
1241 );
1242 None
1243 } else {
1244 None
1245 }
1246 })
1247 .collect()
1248 } else {
1249 HashMap::new()
1250 };
1251
1252 Ok(Snapshot { states, vm_storage })
1253 }
1254}
1255
1256#[cfg(test)]
1257mod tests {
1258 use std::{
1259 collections::{HashMap, HashSet},
1260 str::FromStr,
1261 };
1262
1263 use mockito::Server;
1264 use rstest::rstest;
1265 #[allow(deprecated)]
1267 use tycho_common::dto::ProtocolId;
1268 use tycho_common::dto::{AddressStorageLocation, TracingParams};
1269
1270 use super::*;
1271
1272 impl MockRPCClient {
1275 #[allow(clippy::too_many_arguments)]
1276 async fn test_get_protocol_states_paginated<T>(
1277 &self,
1278 chain: Chain,
1279 ids: &[T],
1280 protocol_system: &str,
1281 include_balances: bool,
1282 version: &VersionParam,
1283 chunk_size: usize,
1284 _concurrency: usize,
1285 ) -> Vec<ProtocolStateRequestBody>
1286 where
1287 T: AsRef<str> + Clone + Send + Sync + 'static,
1288 {
1289 ids.chunks(chunk_size)
1290 .map(|chunk| ProtocolStateRequestBody {
1291 protocol_ids: Some(
1292 chunk
1293 .iter()
1294 .map(|id| id.as_ref().to_string())
1295 .collect(),
1296 ),
1297 protocol_system: protocol_system.to_string(),
1298 chain,
1299 include_balances,
1300 version: version.clone(),
1301 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
1302 })
1303 .collect()
1304 }
1305 }
1306
1307 #[allow(deprecated)]
1309 #[rstest]
1310 #[case::protocol_id_input(vec![
1311 ProtocolId { id: "id1".to_string(), chain: Chain::Ethereum },
1312 ProtocolId { id: "id2".to_string(), chain: Chain::Ethereum }
1313 ])]
1314 #[case::string_input(vec![
1315 "id1".to_string(),
1316 "id2".to_string()
1317 ])]
1318 #[tokio::test]
1319 async fn test_get_protocol_states_paginated_backwards_compatibility<T>(#[case] ids: Vec<T>)
1320 where
1321 T: AsRef<str> + Clone + Send + Sync + 'static,
1322 {
1323 let mock_client = MockRPCClient::new();
1324
1325 let request_bodies = mock_client
1326 .test_get_protocol_states_paginated(
1327 Chain::Ethereum,
1328 &ids,
1329 "test_system",
1330 true,
1331 &VersionParam::default(),
1332 2,
1333 2,
1334 )
1335 .await;
1336
1337 assert_eq!(request_bodies.len(), 1);
1339 assert_eq!(
1340 request_bodies[0]
1341 .protocol_ids
1342 .as_ref()
1343 .unwrap()
1344 .len(),
1345 2
1346 );
1347 }
1348
1349 #[tokio::test]
1350 async fn test_get_contract_state() {
1351 let mut server = Server::new_async().await;
1352 let server_resp = r#"
1353 {
1354 "accounts": [
1355 {
1356 "chain": "ethereum",
1357 "address": "0x0000000000000000000000000000000000000000",
1358 "title": "",
1359 "slots": {},
1360 "native_balance": "0x01f4",
1361 "token_balances": {},
1362 "code": "0x00",
1363 "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
1364 "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
1365 "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
1366 "creation_tx": null
1367 }
1368 ],
1369 "pagination": {
1370 "page": 0,
1371 "page_size": 20,
1372 "total": 10
1373 }
1374 }
1375 "#;
1376 serde_json::from_str::<StateRequestResponse>(server_resp).expect("deserialize");
1378
1379 let mocked_server = server
1380 .mock("POST", "/v1/contract_state")
1381 .expect(1)
1382 .with_body(server_resp)
1383 .create_async()
1384 .await;
1385
1386 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1387
1388 let response = client
1389 .get_contract_state(&Default::default())
1390 .await
1391 .expect("get state");
1392 let accounts = response.accounts;
1393
1394 mocked_server.assert();
1395 assert_eq!(accounts.len(), 1);
1396 assert_eq!(accounts[0].slots, HashMap::new());
1397 assert_eq!(accounts[0].native_balance, Bytes::from(500u16.to_be_bytes()));
1398 assert_eq!(accounts[0].code, [0].to_vec());
1399 assert_eq!(
1400 accounts[0].code_hash,
1401 hex::decode("5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e")
1402 .unwrap()
1403 );
1404 }
1405
1406 #[tokio::test]
1407 async fn test_get_protocol_components() {
1408 let mut server = Server::new_async().await;
1409 let server_resp = r#"
1410 {
1411 "protocol_components": [
1412 {
1413 "id": "State1",
1414 "protocol_system": "ambient",
1415 "protocol_type_name": "Pool",
1416 "chain": "ethereum",
1417 "tokens": [
1418 "0x0000000000000000000000000000000000000000",
1419 "0x0000000000000000000000000000000000000001"
1420 ],
1421 "contract_ids": [
1422 "0x0000000000000000000000000000000000000000"
1423 ],
1424 "static_attributes": {
1425 "attribute_1": "0x00000000000003e8"
1426 },
1427 "change": "Creation",
1428 "creation_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
1429 "created_at": "2022-01-01T00:00:00"
1430 }
1431 ],
1432 "pagination": {
1433 "page": 0,
1434 "page_size": 20,
1435 "total": 10
1436 }
1437 }
1438 "#;
1439 serde_json::from_str::<ProtocolComponentRequestResponse>(server_resp).expect("deserialize");
1441
1442 let mocked_server = server
1443 .mock("POST", "/v1/protocol_components")
1444 .expect(1)
1445 .with_body(server_resp)
1446 .create_async()
1447 .await;
1448
1449 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1450
1451 let response = client
1452 .get_protocol_components(&Default::default())
1453 .await
1454 .expect("get state");
1455 let components = response.protocol_components;
1456
1457 mocked_server.assert();
1458 assert_eq!(components.len(), 1);
1459 assert_eq!(components[0].id, "State1");
1460 assert_eq!(components[0].protocol_system, "ambient");
1461 assert_eq!(components[0].protocol_type_name, "Pool");
1462 assert_eq!(components[0].tokens.len(), 2);
1463 let expected_attributes =
1464 [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
1465 .iter()
1466 .cloned()
1467 .collect::<HashMap<String, Bytes>>();
1468 assert_eq!(components[0].static_attributes, expected_attributes);
1469 }
1470
1471 #[tokio::test]
1472 async fn test_get_protocol_states() {
1473 let mut server = Server::new_async().await;
1474 let server_resp = r#"
1475 {
1476 "states": [
1477 {
1478 "component_id": "State1",
1479 "attributes": {
1480 "attribute_1": "0x00000000000003e8"
1481 },
1482 "balances": {
1483 "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
1484 }
1485 }
1486 ],
1487 "pagination": {
1488 "page": 0,
1489 "page_size": 20,
1490 "total": 10
1491 }
1492 }
1493 "#;
1494 serde_json::from_str::<ProtocolStateRequestResponse>(server_resp).expect("deserialize");
1496
1497 let mocked_server = server
1498 .mock("POST", "/v1/protocol_state")
1499 .expect(1)
1500 .with_body(server_resp)
1501 .create_async()
1502 .await;
1503 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1504
1505 let response = client
1506 .get_protocol_states(&Default::default())
1507 .await
1508 .expect("get state");
1509 let states = response.states;
1510
1511 mocked_server.assert();
1512 assert_eq!(states.len(), 1);
1513 assert_eq!(states[0].component_id, "State1");
1514 let expected_attributes =
1515 [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
1516 .iter()
1517 .cloned()
1518 .collect::<HashMap<String, Bytes>>();
1519 assert_eq!(states[0].attributes, expected_attributes);
1520 let expected_balances = [(
1521 Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")
1522 .expect("Unsupported address format"),
1523 Bytes::from_str("0x01f4").unwrap(),
1524 )]
1525 .iter()
1526 .cloned()
1527 .collect::<HashMap<Bytes, Bytes>>();
1528 assert_eq!(states[0].balances, expected_balances);
1529 }
1530
1531 #[tokio::test]
1532 async fn test_get_tokens() {
1533 let mut server = Server::new_async().await;
1534 let server_resp = r#"
1535 {
1536 "tokens": [
1537 {
1538 "chain": "ethereum",
1539 "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
1540 "symbol": "WETH",
1541 "decimals": 18,
1542 "tax": 0,
1543 "gas": [
1544 29962
1545 ],
1546 "quality": 100
1547 },
1548 {
1549 "chain": "ethereum",
1550 "address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
1551 "symbol": "USDC",
1552 "decimals": 6,
1553 "tax": 0,
1554 "gas": [
1555 40652
1556 ],
1557 "quality": 100
1558 }
1559 ],
1560 "pagination": {
1561 "page": 0,
1562 "page_size": 20,
1563 "total": 10
1564 }
1565 }
1566 "#;
1567 serde_json::from_str::<TokensRequestResponse>(server_resp).expect("deserialize");
1569
1570 let mocked_server = server
1571 .mock("POST", "/v1/tokens")
1572 .expect(1)
1573 .with_body(server_resp)
1574 .create_async()
1575 .await;
1576 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1577
1578 let response = client
1579 .get_tokens(&Default::default())
1580 .await
1581 .expect("get tokens");
1582
1583 let expected = vec![
1584 ResponseToken {
1585 chain: Chain::Ethereum,
1586 address: Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap(),
1587 symbol: "WETH".to_string(),
1588 decimals: 18,
1589 tax: 0,
1590 gas: vec![Some(29962)],
1591 quality: 100,
1592 },
1593 ResponseToken {
1594 chain: Chain::Ethereum,
1595 address: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
1596 symbol: "USDC".to_string(),
1597 decimals: 6,
1598 tax: 0,
1599 gas: vec![Some(40652)],
1600 quality: 100,
1601 },
1602 ];
1603
1604 mocked_server.assert();
1605 assert_eq!(response.tokens, expected);
1606 assert_eq!(response.pagination, PaginationResponse { page: 0, page_size: 20, total: 10 });
1607 }
1608
1609 #[tokio::test]
1610 async fn test_get_protocol_systems() {
1611 let mut server = Server::new_async().await;
1612 let server_resp = r#"
1613 {
1614 "protocol_systems": [
1615 "system1",
1616 "system2"
1617 ],
1618 "pagination": {
1619 "page": 0,
1620 "page_size": 20,
1621 "total": 10
1622 }
1623 }
1624 "#;
1625 serde_json::from_str::<ProtocolSystemsRequestResponse>(server_resp).expect("deserialize");
1627
1628 let mocked_server = server
1629 .mock("POST", "/v1/protocol_systems")
1630 .expect(1)
1631 .with_body(server_resp)
1632 .create_async()
1633 .await;
1634 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1635
1636 let response = client
1637 .get_protocol_systems(&Default::default())
1638 .await
1639 .expect("get protocol systems");
1640 let protocol_systems = response.protocol_systems;
1641
1642 mocked_server.assert();
1643 assert_eq!(protocol_systems, vec!["system1", "system2"]);
1644 }
1645
1646 #[tokio::test]
1647 async fn test_get_component_tvl() {
1648 let mut server = Server::new_async().await;
1649 let server_resp = r#"
1650 {
1651 "tvl": {
1652 "component1": 100.0
1653 },
1654 "pagination": {
1655 "page": 0,
1656 "page_size": 20,
1657 "total": 10
1658 }
1659 }
1660 "#;
1661 serde_json::from_str::<ComponentTvlRequestResponse>(server_resp).expect("deserialize");
1663
1664 let mocked_server = server
1665 .mock("POST", "/v1/component_tvl")
1666 .expect(1)
1667 .with_body(server_resp)
1668 .create_async()
1669 .await;
1670 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1671
1672 let response = client
1673 .get_component_tvl(&Default::default())
1674 .await
1675 .expect("get protocol systems");
1676 let component_tvl = response.tvl;
1677
1678 mocked_server.assert();
1679 assert_eq!(component_tvl.get("component1"), Some(&100.0));
1680 }
1681
1682 #[tokio::test]
1683 async fn test_get_traced_entry_points() {
1684 let mut server = Server::new_async().await;
1685 let server_resp = r#"
1686 {
1687 "traced_entry_points": {
1688 "component_1": [
1689 [
1690 {
1691 "entry_point": {
1692 "external_id": "entrypoint_a",
1693 "target": "0x0000000000000000000000000000000000000001",
1694 "signature": "sig()"
1695 },
1696 "params": {
1697 "method": "rpctracer",
1698 "caller": "0x000000000000000000000000000000000000000a",
1699 "calldata": "0x000000000000000000000000000000000000000b"
1700 }
1701 },
1702 {
1703 "retriggers": [
1704 [
1705 "0x00000000000000000000000000000000000000aa",
1706 {"key": "0x0000000000000000000000000000000000000aaa", "offset": 12}
1707 ]
1708 ],
1709 "accessed_slots": {
1710 "0x0000000000000000000000000000000000aaaa": [
1711 "0x0000000000000000000000000000000000aaaa"
1712 ]
1713 }
1714 }
1715 ]
1716 ]
1717 },
1718 "pagination": {
1719 "page": 0,
1720 "page_size": 20,
1721 "total": 1
1722 }
1723 }
1724 "#;
1725 serde_json::from_str::<TracedEntryPointRequestResponse>(server_resp).expect("deserialize");
1727
1728 let mocked_server = server
1729 .mock("POST", "/v1/traced_entry_points")
1730 .expect(1)
1731 .with_body(server_resp)
1732 .create_async()
1733 .await;
1734 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1735
1736 let response = client
1737 .get_traced_entry_points(&Default::default())
1738 .await
1739 .expect("get traced entry points");
1740 let entrypoints = response.traced_entry_points;
1741
1742 mocked_server.assert();
1743 assert_eq!(entrypoints.len(), 1);
1744 let comp1_entrypoints = entrypoints
1745 .get("component_1")
1746 .expect("component_1 entrypoints should exist");
1747 assert_eq!(comp1_entrypoints.len(), 1);
1748
1749 let (entrypoint, trace_result) = &comp1_entrypoints[0];
1750 assert_eq!(entrypoint.entry_point.external_id, "entrypoint_a");
1751 assert_eq!(
1752 entrypoint.entry_point.target,
1753 Bytes::from_str("0x0000000000000000000000000000000000000001").unwrap()
1754 );
1755 assert_eq!(entrypoint.entry_point.signature, "sig()");
1756 let TracingParams::RPCTracer(rpc_params) = &entrypoint.params;
1757 assert_eq!(
1758 rpc_params.caller,
1759 Some(Bytes::from("0x000000000000000000000000000000000000000a"))
1760 );
1761 assert_eq!(rpc_params.calldata, Bytes::from("0x000000000000000000000000000000000000000b"));
1762
1763 assert_eq!(
1764 trace_result.retriggers,
1765 HashSet::from([(
1766 Bytes::from("0x00000000000000000000000000000000000000aa"),
1767 AddressStorageLocation::new(
1768 Bytes::from("0x0000000000000000000000000000000000000aaa"),
1769 12
1770 )
1771 )])
1772 );
1773 assert_eq!(trace_result.accessed_slots.len(), 1);
1774 assert_eq!(
1775 trace_result.accessed_slots,
1776 HashMap::from([(
1777 Bytes::from("0x0000000000000000000000000000000000aaaa"),
1778 HashSet::from([Bytes::from("0x0000000000000000000000000000000000aaaa")])
1779 )])
1780 );
1781 }
1782
1783 #[tokio::test]
1784 async fn test_parse_retry_value_numeric() {
1785 let result = parse_retry_value("60");
1786 assert!(result.is_some());
1787
1788 let expected_time = SystemTime::now() + Duration::from_secs(60);
1789 let actual_time = result.unwrap();
1790
1791 let diff = if actual_time > expected_time {
1793 actual_time
1794 .duration_since(expected_time)
1795 .unwrap()
1796 } else {
1797 expected_time
1798 .duration_since(actual_time)
1799 .unwrap()
1800 };
1801 assert!(diff < Duration::from_secs(1), "Time difference too large: {:?}", diff);
1802 }
1803
1804 #[tokio::test]
1805 async fn test_parse_retry_value_rfc2822() {
1806 let rfc2822_date = "Sat, 01 Jan 2030 12:00:00 +0000";
1808 let result = parse_retry_value(rfc2822_date);
1809 assert!(result.is_some());
1810
1811 let parsed_time = result.unwrap();
1812 assert!(parsed_time > SystemTime::now());
1813 }
1814
1815 #[tokio::test]
1816 async fn test_parse_retry_value_invalid_formats() {
1817 assert!(parse_retry_value("invalid").is_none());
1819 assert!(parse_retry_value("").is_none());
1820 assert!(parse_retry_value("not_a_number").is_none());
1821 assert!(parse_retry_value("Mon, 32 Jan 2030 25:00:00 +0000").is_none()); }
1823
1824 #[tokio::test]
1825 async fn test_parse_retry_value_zero_seconds() {
1826 let result = parse_retry_value("0");
1827 assert!(result.is_some());
1828
1829 let expected_time = SystemTime::now();
1830 let actual_time = result.unwrap();
1831
1832 let diff = if actual_time > expected_time {
1834 actual_time
1835 .duration_since(expected_time)
1836 .unwrap()
1837 } else {
1838 expected_time
1839 .duration_since(actual_time)
1840 .unwrap()
1841 };
1842 assert!(diff < Duration::from_secs(1));
1843 }
1844
1845 #[tokio::test]
1846 async fn test_error_for_response_rate_limited() {
1847 let mut server = Server::new_async().await;
1848 let mock = server
1849 .mock("GET", "/test")
1850 .with_status(429)
1851 .with_header("Retry-After", "60")
1852 .create_async()
1853 .await;
1854
1855 let client = reqwest::Client::new();
1856 let response = client
1857 .get(format!("{}/test", server.url()))
1858 .send()
1859 .await
1860 .unwrap();
1861
1862 let http_client = HttpRPCClient::new(server.url().as_str(), None)
1863 .unwrap()
1864 .with_test_backoff_policy();
1865 let result = http_client
1866 .error_for_response(response)
1867 .await;
1868
1869 mock.assert();
1870 assert!(matches!(result, Err(RPCError::RateLimited(_))));
1871 if let Err(RPCError::RateLimited(retry_after)) = result {
1872 assert!(retry_after.is_some());
1873 }
1874 }
1875
1876 #[tokio::test]
1877 async fn test_error_for_response_rate_limited_no_header() {
1878 let mut server = Server::new_async().await;
1879 let mock = server
1880 .mock("GET", "/test")
1881 .with_status(429)
1882 .create_async()
1883 .await;
1884
1885 let client = reqwest::Client::new();
1886 let response = client
1887 .get(format!("{}/test", server.url()))
1888 .send()
1889 .await
1890 .unwrap();
1891
1892 let http_client = HttpRPCClient::new(server.url().as_str(), None)
1893 .unwrap()
1894 .with_test_backoff_policy();
1895 let result = http_client
1896 .error_for_response(response)
1897 .await;
1898
1899 mock.assert();
1900 assert!(matches!(result, Err(RPCError::RateLimited(None))));
1901 }
1902
1903 #[tokio::test]
1904 async fn test_error_for_response_server_errors() {
1905 let test_cases =
1906 vec![(502, "Bad Gateway"), (503, "Service Unavailable"), (504, "Gateway Timeout")];
1907
1908 for (status_code, expected_body) in test_cases {
1909 let mut server = Server::new_async().await;
1910 let mock = server
1911 .mock("GET", "/test")
1912 .with_status(status_code)
1913 .with_body(expected_body)
1914 .create_async()
1915 .await;
1916
1917 let client = reqwest::Client::new();
1918 let response = client
1919 .get(format!("{}/test", server.url()))
1920 .send()
1921 .await
1922 .unwrap();
1923
1924 let http_client = HttpRPCClient::new(server.url().as_str(), None)
1925 .unwrap()
1926 .with_test_backoff_policy();
1927 let result = http_client
1928 .error_for_response(response)
1929 .await;
1930
1931 mock.assert();
1932 assert!(matches!(result, Err(RPCError::ServerUnreachable(_))));
1933 if let Err(RPCError::ServerUnreachable(body)) = result {
1934 assert_eq!(body, expected_body);
1935 }
1936 }
1937 }
1938
1939 #[tokio::test]
1940 async fn test_error_for_response_success() {
1941 let mut server = Server::new_async().await;
1942 let mock = server
1943 .mock("GET", "/test")
1944 .with_status(200)
1945 .with_body("success")
1946 .create_async()
1947 .await;
1948
1949 let client = reqwest::Client::new();
1950 let response = client
1951 .get(format!("{}/test", server.url()))
1952 .send()
1953 .await
1954 .unwrap();
1955
1956 let http_client = HttpRPCClient::new(server.url().as_str(), None)
1957 .unwrap()
1958 .with_test_backoff_policy();
1959 let result = http_client
1960 .error_for_response(response)
1961 .await;
1962
1963 mock.assert();
1964 assert!(result.is_ok());
1965
1966 let response = result.unwrap();
1967 assert_eq!(response.status(), 200);
1968 }
1969
1970 #[tokio::test]
1971 async fn test_handle_error_for_backoff_server_unreachable() {
1972 let http_client = HttpRPCClient::new("http://localhost:8080", None)
1973 .unwrap()
1974 .with_test_backoff_policy();
1975 let error = RPCError::ServerUnreachable("Service down".to_string());
1976
1977 let backoff_error = http_client
1978 .handle_error_for_backoff(error)
1979 .await;
1980
1981 match backoff_error {
1982 backoff::Error::Transient { err: RPCError::ServerUnreachable(msg), retry_after } => {
1983 assert_eq!(msg, "Service down");
1984 assert_eq!(retry_after, Some(Duration::from_millis(50))); }
1986 _ => panic!("Expected transient error for ServerUnreachable"),
1987 }
1988 }
1989
1990 #[tokio::test]
1991 async fn test_handle_error_for_backoff_rate_limited_with_retry_after() {
1992 let http_client = HttpRPCClient::new("http://localhost:8080", None)
1993 .unwrap()
1994 .with_test_backoff_policy();
1995 let future_time = SystemTime::now() + Duration::from_secs(30);
1996 let error = RPCError::RateLimited(Some(future_time));
1997
1998 let backoff_error = http_client
1999 .handle_error_for_backoff(error)
2000 .await;
2001
2002 match backoff_error {
2003 backoff::Error::Transient { err: RPCError::RateLimited(retry_after), .. } => {
2004 assert_eq!(retry_after, Some(future_time));
2005 }
2006 _ => panic!("Expected transient error for RateLimited"),
2007 }
2008
2009 let stored_retry_after = http_client.retry_after.read().await;
2011 assert_eq!(*stored_retry_after, Some(future_time));
2012 }
2013
2014 #[tokio::test]
2015 async fn test_handle_error_for_backoff_rate_limited_no_retry_after() {
2016 let http_client = HttpRPCClient::new("http://localhost:8080", None)
2017 .unwrap()
2018 .with_test_backoff_policy();
2019 let error = RPCError::RateLimited(None);
2020
2021 let backoff_error = http_client
2022 .handle_error_for_backoff(error)
2023 .await;
2024
2025 match backoff_error {
2026 backoff::Error::Transient { err: RPCError::RateLimited(None), .. } => {
2027 }
2029 _ => panic!("Expected transient error for RateLimited without retry-after"),
2030 }
2031 }
2032
2033 #[tokio::test]
2034 async fn test_handle_error_for_backoff_other_errors() {
2035 let http_client = HttpRPCClient::new("http://localhost:8080", None)
2036 .unwrap()
2037 .with_test_backoff_policy();
2038 let error = RPCError::ParseResponse("Invalid JSON".to_string());
2039
2040 let backoff_error = http_client
2041 .handle_error_for_backoff(error)
2042 .await;
2043
2044 match backoff_error {
2045 backoff::Error::Permanent(RPCError::ParseResponse(msg)) => {
2046 assert_eq!(msg, "Invalid JSON");
2047 }
2048 _ => panic!("Expected permanent error for ParseResponse"),
2049 }
2050 }
2051
2052 #[tokio::test]
2053 async fn test_wait_until_retry_after_no_retry_time() {
2054 let http_client = HttpRPCClient::new("http://localhost:8080", None)
2055 .unwrap()
2056 .with_test_backoff_policy();
2057
2058 let start = std::time::Instant::now();
2059 http_client
2060 .wait_until_retry_after()
2061 .await;
2062 let elapsed = start.elapsed();
2063
2064 assert!(elapsed < Duration::from_millis(100));
2066 }
2067
2068 #[tokio::test]
2069 async fn test_wait_until_retry_after_past_time() {
2070 let http_client = HttpRPCClient::new("http://localhost:8080", None)
2071 .unwrap()
2072 .with_test_backoff_policy();
2073
2074 let past_time = SystemTime::now() - Duration::from_secs(10);
2076 *http_client.retry_after.write().await = Some(past_time);
2077
2078 let start = std::time::Instant::now();
2079 http_client
2080 .wait_until_retry_after()
2081 .await;
2082 let elapsed = start.elapsed();
2083
2084 assert!(elapsed < Duration::from_millis(100));
2086 }
2087
2088 #[tokio::test]
2089 async fn test_wait_until_retry_after_future_time() {
2090 let http_client = HttpRPCClient::new("http://localhost:8080", None)
2091 .unwrap()
2092 .with_test_backoff_policy();
2093
2094 let future_time = SystemTime::now() + Duration::from_millis(100);
2096 *http_client.retry_after.write().await = Some(future_time);
2097
2098 let start = std::time::Instant::now();
2099 http_client
2100 .wait_until_retry_after()
2101 .await;
2102 let elapsed = start.elapsed();
2103
2104 assert!(elapsed >= Duration::from_millis(80)); assert!(elapsed <= Duration::from_millis(200)); }
2108
2109 #[tokio::test]
2110 async fn test_make_post_request_success() {
2111 let mut server = Server::new_async().await;
2112 let server_resp = r#"{"success": true}"#;
2113
2114 let mock = server
2115 .mock("POST", "/test")
2116 .with_status(200)
2117 .with_body(server_resp)
2118 .create_async()
2119 .await;
2120
2121 let http_client = HttpRPCClient::new(server.url().as_str(), None)
2122 .unwrap()
2123 .with_test_backoff_policy();
2124 let request_body = serde_json::json!({"test": "data"});
2125 let uri = format!("{}/test", server.url());
2126
2127 let result = http_client
2128 .make_post_request(&request_body, &uri)
2129 .await;
2130
2131 mock.assert();
2132 assert!(result.is_ok());
2133
2134 let response = result.unwrap();
2135 assert_eq!(response.status(), 200);
2136 assert_eq!(response.text().await.unwrap(), server_resp);
2137 }
2138
2139 #[tokio::test]
2140 async fn test_make_post_request_retry_on_server_error() {
2141 let mut server = Server::new_async().await;
2142 let error_mock = server
2144 .mock("POST", "/test")
2145 .with_status(503)
2146 .with_body("Service Unavailable")
2147 .expect(1)
2148 .create_async()
2149 .await;
2150
2151 let success_mock = server
2152 .mock("POST", "/test")
2153 .with_status(200)
2154 .with_body(r#"{"success": true}"#)
2155 .expect(1)
2156 .create_async()
2157 .await;
2158
2159 let http_client = HttpRPCClient::new(server.url().as_str(), None)
2160 .unwrap()
2161 .with_test_backoff_policy();
2162 let request_body = serde_json::json!({"test": "data"});
2163 let uri = format!("{}/test", server.url());
2164
2165 let result = http_client
2166 .make_post_request(&request_body, &uri)
2167 .await;
2168
2169 error_mock.assert();
2170 success_mock.assert();
2171 assert!(result.is_ok());
2172 }
2173
2174 #[tokio::test]
2175 async fn test_make_post_request_respect_retry_after_header() {
2176 let mut server = Server::new_async().await;
2177
2178 let rate_limit_mock = server
2180 .mock("POST", "/test")
2181 .with_status(429)
2182 .with_header("Retry-After", "1") .expect(1)
2184 .create_async()
2185 .await;
2186
2187 let success_mock = server
2188 .mock("POST", "/test")
2189 .with_status(200)
2190 .with_body(r#"{"success": true}"#)
2191 .expect(1)
2192 .create_async()
2193 .await;
2194
2195 let http_client = HttpRPCClient::new(server.url().as_str(), None)
2196 .unwrap()
2197 .with_test_backoff_policy();
2198 let request_body = serde_json::json!({"test": "data"});
2199 let uri = format!("{}/test", server.url());
2200
2201 let start = std::time::Instant::now();
2202 let result = http_client
2203 .make_post_request(&request_body, &uri)
2204 .await;
2205 let elapsed = start.elapsed();
2206
2207 rate_limit_mock.assert();
2208 success_mock.assert();
2209 assert!(result.is_ok());
2210
2211 assert!(elapsed >= Duration::from_millis(900)); assert!(elapsed <= Duration::from_millis(2000)); }
2215
2216 #[tokio::test]
2217 async fn test_make_post_request_permanent_error() {
2218 let mut server = Server::new_async().await;
2219
2220 let mock = server
2221 .mock("POST", "/test")
2222 .with_status(400) .with_body("Bad Request")
2224 .expect(1)
2225 .create_async()
2226 .await;
2227
2228 let http_client = HttpRPCClient::new(server.url().as_str(), None)
2229 .unwrap()
2230 .with_test_backoff_policy();
2231 let request_body = serde_json::json!({"test": "data"});
2232 let uri = format!("{}/test", server.url());
2233
2234 let result = http_client
2235 .make_post_request(&request_body, &uri)
2236 .await;
2237
2238 mock.assert();
2239 assert!(result.is_ok()); let response = result.unwrap();
2242 assert_eq!(response.status(), 400);
2243 }
2244
2245 #[tokio::test]
2246 async fn test_concurrent_requests_with_different_retry_after() {
2247 let mut server = Server::new_async().await;
2248
2249 let rate_limit_mock_1 = server
2251 .mock("POST", "/test1")
2252 .with_status(429)
2253 .with_header("Retry-After", "1")
2254 .expect(1)
2255 .create_async()
2256 .await;
2257
2258 let rate_limit_mock_2 = server
2260 .mock("POST", "/test2")
2261 .with_status(429)
2262 .with_header("Retry-After", "2")
2263 .expect(1)
2264 .create_async()
2265 .await;
2266
2267 let success_mock_1 = server
2269 .mock("POST", "/test1")
2270 .with_status(200)
2271 .with_body(r#"{"result": "success1"}"#)
2272 .expect(1)
2273 .create_async()
2274 .await;
2275
2276 let success_mock_2 = server
2277 .mock("POST", "/test2")
2278 .with_status(200)
2279 .with_body(r#"{"result": "success2"}"#)
2280 .expect(1)
2281 .create_async()
2282 .await;
2283
2284 let http_client = HttpRPCClient::new(server.url().as_str(), None)
2285 .unwrap()
2286 .with_test_backoff_policy();
2287 let request_body = serde_json::json!({"test": "data"});
2288
2289 let uri1 = format!("{}/test1", server.url());
2290 let uri2 = format!("{}/test2", server.url());
2291
2292 let start = std::time::Instant::now();
2294 let (result1, result2) = tokio::join!(
2295 http_client.make_post_request(&request_body, &uri1),
2296 http_client.make_post_request(&request_body, &uri2)
2297 );
2298 let elapsed = start.elapsed();
2299
2300 rate_limit_mock_1.assert();
2301 rate_limit_mock_2.assert();
2302 success_mock_1.assert();
2303 success_mock_2.assert();
2304
2305 assert!(result1.is_ok());
2306 assert!(result2.is_ok());
2307
2308 assert!(elapsed >= Duration::from_millis(1800)); assert!(elapsed <= Duration::from_millis(3000)); let final_retry_after = http_client.retry_after.read().await;
2316 assert!(final_retry_after.is_some());
2317
2318 if let Some(retry_time) = *final_retry_after {
2320 let now = SystemTime::now();
2323 let diff = if retry_time > now {
2324 retry_time.duration_since(now).unwrap()
2325 } else {
2326 now.duration_since(retry_time).unwrap()
2327 };
2328
2329 assert!(diff <= Duration::from_secs(3), "Retry time difference too large: {:?}", diff);
2331 }
2332 }
2333
2334 #[tokio::test]
2335 async fn test_get_snapshots() {
2336 let mut server = Server::new_async().await;
2337
2338 let protocol_states_resp = r#"
2340 {
2341 "states": [
2342 {
2343 "component_id": "component1",
2344 "attributes": {
2345 "attribute_1": "0x00000000000003e8"
2346 },
2347 "balances": {
2348 "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
2349 }
2350 }
2351 ],
2352 "pagination": {
2353 "page": 0,
2354 "page_size": 100,
2355 "total": 1
2356 }
2357 }
2358 "#;
2359
2360 let contract_state_resp = r#"
2362 {
2363 "accounts": [
2364 {
2365 "chain": "ethereum",
2366 "address": "0x1111111111111111111111111111111111111111",
2367 "title": "",
2368 "slots": {},
2369 "native_balance": "0x01f4",
2370 "token_balances": {},
2371 "code": "0x00",
2372 "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
2373 "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2374 "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2375 "creation_tx": null
2376 }
2377 ],
2378 "pagination": {
2379 "page": 0,
2380 "page_size": 100,
2381 "total": 1
2382 }
2383 }
2384 "#;
2385
2386 let tvl_resp = r#"
2388 {
2389 "tvl": {
2390 "component1": 1000000.0
2391 },
2392 "pagination": {
2393 "page": 0,
2394 "page_size": 100,
2395 "total": 1
2396 }
2397 }
2398 "#;
2399
2400 let protocol_states_mock = server
2401 .mock("POST", "/v1/protocol_state")
2402 .expect(1)
2403 .with_body(protocol_states_resp)
2404 .create_async()
2405 .await;
2406
2407 let contract_state_mock = server
2408 .mock("POST", "/v1/contract_state")
2409 .expect(1)
2410 .with_body(contract_state_resp)
2411 .create_async()
2412 .await;
2413
2414 let tvl_mock = server
2415 .mock("POST", "/v1/component_tvl")
2416 .expect(1)
2417 .with_body(tvl_resp)
2418 .create_async()
2419 .await;
2420
2421 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
2422
2423 #[allow(deprecated)]
2424 let component = tycho_common::dto::ProtocolComponent {
2425 id: "component1".to_string(),
2426 protocol_system: "test_protocol".to_string(),
2427 protocol_type_name: "test_type".to_string(),
2428 chain: Chain::Ethereum,
2429 tokens: vec![],
2430 contract_ids: vec![
2431 Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()
2432 ],
2433 static_attributes: HashMap::new(),
2434 change: tycho_common::dto::ChangeType::Creation,
2435 creation_tx: Bytes::from_str(
2436 "0x0000000000000000000000000000000000000000000000000000000000000000",
2437 )
2438 .unwrap(),
2439 created_at: chrono::Utc::now().naive_utc(),
2440 };
2441
2442 let mut components = HashMap::new();
2443 components.insert("component1".to_string(), component);
2444
2445 let contract_ids =
2446 vec![Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()];
2447
2448 let request = SnapshotParameters::new(
2449 Chain::Ethereum,
2450 "test_protocol",
2451 &components,
2452 &contract_ids,
2453 12345,
2454 );
2455
2456 let response = client
2457 .get_snapshots(&request, 100, 4)
2458 .await
2459 .expect("get snapshots");
2460
2461 protocol_states_mock.assert();
2463 contract_state_mock.assert();
2464 tvl_mock.assert();
2465
2466 assert_eq!(response.states.len(), 1);
2468 assert!(response
2469 .states
2470 .contains_key("component1"));
2471
2472 let component_state = response
2474 .states
2475 .get("component1")
2476 .unwrap();
2477 assert_eq!(component_state.component_tvl, Some(1000000.0));
2478
2479 assert_eq!(response.vm_storage.len(), 1);
2481 let contract_addr = Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap();
2482 assert!(response
2483 .vm_storage
2484 .contains_key(&contract_addr));
2485 }
2486
2487 #[tokio::test]
2488 async fn test_get_snapshots_empty_components() {
2489 let server = Server::new_async().await;
2490 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
2491
2492 let components = HashMap::new();
2493 let contract_ids = vec![];
2494
2495 let request = SnapshotParameters::new(
2496 Chain::Ethereum,
2497 "test_protocol",
2498 &components,
2499 &contract_ids,
2500 12345,
2501 );
2502
2503 let response = client
2504 .get_snapshots(&request, 100, 4)
2505 .await
2506 .expect("get snapshots");
2507
2508 assert!(response.states.is_empty());
2510 assert!(response.vm_storage.is_empty());
2511 }
2512
2513 #[tokio::test]
2514 async fn test_get_snapshots_without_tvl() {
2515 let mut server = Server::new_async().await;
2516
2517 let protocol_states_resp = r#"
2518 {
2519 "states": [
2520 {
2521 "component_id": "component1",
2522 "attributes": {},
2523 "balances": {}
2524 }
2525 ],
2526 "pagination": {
2527 "page": 0,
2528 "page_size": 100,
2529 "total": 1
2530 }
2531 }
2532 "#;
2533
2534 let protocol_states_mock = server
2535 .mock("POST", "/v1/protocol_state")
2536 .expect(1)
2537 .with_body(protocol_states_resp)
2538 .create_async()
2539 .await;
2540
2541 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
2542
2543 #[allow(deprecated)]
2545 let component = tycho_common::dto::ProtocolComponent {
2546 id: "component1".to_string(),
2547 protocol_system: "test_protocol".to_string(),
2548 protocol_type_name: "test_type".to_string(),
2549 chain: Chain::Ethereum,
2550 tokens: vec![],
2551 contract_ids: vec![],
2552 static_attributes: HashMap::new(),
2553 change: tycho_common::dto::ChangeType::Creation,
2554 creation_tx: Bytes::from_str(
2555 "0x0000000000000000000000000000000000000000000000000000000000000000",
2556 )
2557 .unwrap(),
2558 created_at: chrono::Utc::now().naive_utc(),
2559 };
2560
2561 let mut components = HashMap::new();
2562 components.insert("component1".to_string(), component);
2563 let contract_ids = vec![];
2564
2565 let request = SnapshotParameters::new(
2566 Chain::Ethereum,
2567 "test_protocol",
2568 &components,
2569 &contract_ids,
2570 12345,
2571 )
2572 .include_balances(false)
2573 .include_tvl(false);
2574
2575 let response = client
2576 .get_snapshots(&request, 100, 4)
2577 .await
2578 .expect("get snapshots");
2579
2580 protocol_states_mock.assert();
2582 assert_eq!(response.states.len(), 1);
2586 let component_state = response
2588 .states
2589 .get("component1")
2590 .unwrap();
2591 assert_eq!(component_state.component_tvl, None);
2592 }
2593}