1use std::{
7 collections::HashMap,
8 sync::Arc,
9 time::{Duration, SystemTime},
10};
11
12use async_trait::async_trait;
13use backoff::{exponential::ExponentialBackoffBuilder, ExponentialBackoff};
14use futures03::future::try_join_all;
15#[cfg(test)]
16use mockall::automock;
17use reqwest::{header, Client, ClientBuilder, Response, StatusCode, Url};
18use serde::Serialize;
19use thiserror::Error;
20use time::{format_description::well_known::Rfc2822, OffsetDateTime};
21use tokio::{
22 sync::{RwLock, Semaphore},
23 time::sleep,
24};
25use tracing::{debug, error, instrument, trace, warn};
26use tycho_common::{
27 dto::{
28 ComponentTvlRequestBody, ComponentTvlRequestResponse, PaginationLimits, PaginationParams,
29 ProtocolComponentRequestResponse, ProtocolComponentsRequestBody, ProtocolStateRequestBody,
30 ProtocolStateRequestResponse, ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse,
31 StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
32 TracedEntryPointRequestBody, TracedEntryPointRequestResponse, VersionParam,
33 },
34 models::{
35 blockchain::{EntryPointWithTracingParams, TracedEntryPoints, TracingResult},
36 contract::Account,
37 protocol::{ProtocolComponent, ProtocolComponentState},
38 token::Token,
39 Chain, ComponentId,
40 },
41 Bytes,
42};
43
44#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct ProtocolSystems {
47 protocol_systems: Vec<String>,
48 dci_protocols: Vec<String>,
49}
50
51impl ProtocolSystems {
52 pub(crate) fn new(protocol_systems: Vec<String>, dci_protocols: Vec<String>) -> Self {
53 Self { protocol_systems, dci_protocols }
54 }
55
56 pub fn protocol_systems(&self) -> &[String] {
57 &self.protocol_systems
58 }
59
60 pub fn dci_protocols(&self) -> &[String] {
61 &self.dci_protocols
62 }
63}
64
65#[derive(Debug, Clone, PartialEq)]
67pub struct Page<T> {
68 data: T,
69 total: i64,
70 page: i64,
71 page_size: i64,
72}
73
74impl<T> Page<T> {
75 pub fn new(data: T, total: i64, page: i64, page_size: i64) -> Self {
76 Page { data, total, page, page_size }
77 }
78
79 pub fn data(&self) -> &T {
80 &self.data
81 }
82
83 pub fn into_data(self) -> T {
84 self.data
85 }
86
87 pub fn total(&self) -> i64 {
88 self.total
89 }
90
91 pub fn page(&self) -> i64 {
92 self.page
93 }
94
95 pub fn page_size(&self) -> i64 {
96 self.page_size
97 }
98}
99
100impl<T> Page<Vec<T>> {
101 pub fn len(&self) -> usize {
102 self.data.len()
103 }
104
105 pub fn is_empty(&self) -> bool {
106 self.data.is_empty()
107 }
108}
109
110impl<K, V, S: std::hash::BuildHasher> Page<HashMap<K, V, S>> {
111 pub fn len(&self) -> usize {
112 self.data.len()
113 }
114
115 pub fn is_empty(&self) -> bool {
116 self.data.is_empty()
117 }
118}
119
120impl<T: IntoIterator> IntoIterator for Page<T> {
121 type Item = T::Item;
122 type IntoIter = T::IntoIter;
123
124 fn into_iter(self) -> Self::IntoIter {
125 self.data.into_iter()
126 }
127}
128
129impl<'a, T> IntoIterator for &'a Page<T>
130where
131 &'a T: IntoIterator,
132{
133 type Item = <&'a T as IntoIterator>::Item;
134 type IntoIter = <&'a T as IntoIterator>::IntoIter;
135
136 fn into_iter(self) -> Self::IntoIter {
137 (&self.data).into_iter()
138 }
139}
140
141use crate::{
142 feed::synchronizer::{ComponentWithState, Snapshot},
143 TYCHO_SERVER_VERSION,
144};
145
146pub const RPC_CLIENT_CONCURRENCY: usize = 4;
148
149#[derive(Clone, PartialEq, Debug)]
151pub struct ContractStateParams {
152 chain: Chain,
153 protocol_system: String,
154 contract_ids: Option<Vec<Bytes>>,
155 version: VersionParam,
156 page: i64,
157 page_size: i64,
158}
159
160impl ContractStateParams {
161 pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
162 Self {
163 chain,
164 protocol_system: protocol_system.into(),
165 contract_ids: None,
166 version: VersionParam::default(),
167 page: 0,
168 page_size: StateRequestBody::MAX_PAGE_SIZE_COMPRESSED,
169 }
170 }
171
172 pub fn with_contract_ids(mut self, ids: Vec<Bytes>) -> Self {
173 self.contract_ids = Some(ids);
174 self
175 }
176
177 pub fn with_version(mut self, version: VersionParam) -> Self {
178 self.version = version;
179 self
180 }
181
182 pub fn with_block_number(mut self, block_number: u64) -> Self {
183 self.version = VersionParam::at_block(self.chain.into(), block_number);
184 self
185 }
186
187 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
188 self.page = page;
189 self.page_size = page_size;
190 self
191 }
192}
193
194#[derive(Clone, PartialEq, Debug)]
196pub struct ProtocolComponentsParams {
197 chain: Chain,
198 protocol_system: String,
199 component_ids: Option<Vec<ComponentId>>,
200 tvl_gt: Option<f64>,
201 page: i64,
202 page_size: i64,
203}
204
205impl ProtocolComponentsParams {
206 pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
207 Self {
208 chain,
209 protocol_system: protocol_system.into(),
210 component_ids: None,
211 tvl_gt: None,
212 page: 0,
213 page_size: ProtocolComponentsRequestBody::MAX_PAGE_SIZE_COMPRESSED,
214 }
215 }
216
217 pub fn with_component_ids(mut self, ids: Vec<ComponentId>) -> Self {
218 self.component_ids = Some(ids);
219 self
220 }
221
222 pub fn with_tvl_gt(mut self, tvl_gt: f64) -> Self {
223 self.tvl_gt = Some(tvl_gt);
224 self
225 }
226
227 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
228 self.page = page;
229 self.page_size = page_size;
230 self
231 }
232
233 #[cfg(test)]
234 pub(crate) fn component_ids(&self) -> Option<&Vec<ComponentId>> {
235 self.component_ids.as_ref()
236 }
237}
238
239#[derive(Clone, PartialEq, Debug)]
241pub struct ProtocolStatesParams {
242 chain: Chain,
243 protocol_system: String,
244 protocol_ids: Option<Vec<String>>,
245 include_balances: bool,
246 version: VersionParam,
247 page: i64,
248 page_size: i64,
249}
250
251impl ProtocolStatesParams {
252 pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
253 Self {
254 chain,
255 protocol_system: protocol_system.into(),
256 protocol_ids: None,
257 include_balances: false,
258 version: VersionParam::default(),
259 page: 0,
260 page_size: ProtocolStateRequestBody::MAX_PAGE_SIZE_COMPRESSED,
261 }
262 }
263
264 pub fn with_protocol_ids(mut self, ids: Vec<String>) -> Self {
265 self.protocol_ids = Some(ids);
266 self
267 }
268
269 pub fn with_include_balances(mut self, include_balances: bool) -> Self {
270 self.include_balances = include_balances;
271 self
272 }
273
274 pub fn with_version(mut self, version: VersionParam) -> Self {
275 self.version = version;
276 self
277 }
278
279 pub fn with_block_number(mut self, block_number: u64) -> Self {
280 self.version = VersionParam::at_block(self.chain.into(), block_number);
281 self
282 }
283
284 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
285 self.page = page;
286 self.page_size = page_size;
287 self
288 }
289}
290
291#[derive(Clone, PartialEq, Debug)]
293pub struct TokensParams {
294 chain: Chain,
295 min_quality: Option<i32>,
296 traded_n_days_ago: Option<u64>,
297 page: i64,
298 page_size: i64,
299}
300
301impl TokensParams {
302 pub fn new(chain: Chain) -> Self {
303 Self {
304 chain,
305 min_quality: None,
306 traded_n_days_ago: None,
307 page: 0,
308 page_size: TokensRequestBody::MAX_PAGE_SIZE_COMPRESSED,
309 }
310 }
311
312 pub fn with_min_quality(mut self, min_quality: i32) -> Self {
313 self.min_quality = Some(min_quality);
314 self
315 }
316
317 pub fn with_traded_n_days_ago(mut self, days: u64) -> Self {
318 self.traded_n_days_ago = Some(days);
319 self
320 }
321
322 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
323 self.page = page;
324 self.page_size = page_size;
325 self
326 }
327}
328
329#[derive(Clone, PartialEq, Debug)]
331pub struct ProtocolSystemsParams {
332 chain: Chain,
333 page: i64,
334 page_size: i64,
335}
336
337impl ProtocolSystemsParams {
338 pub fn new(chain: Chain) -> Self {
339 Self { chain, page: 0, page_size: ProtocolSystemsRequestBody::MAX_PAGE_SIZE_COMPRESSED }
340 }
341
342 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
343 self.page = page;
344 self.page_size = page_size;
345 self
346 }
347}
348
349#[derive(Clone, PartialEq, Debug)]
351pub struct ComponentTvlParams {
352 chain: Chain,
353 protocol_system: Option<String>,
354 component_ids: Option<Vec<String>>,
355 page: i64,
356 page_size: i64,
357}
358
359impl ComponentTvlParams {
360 pub fn new(chain: Chain) -> Self {
361 Self {
362 chain,
363 protocol_system: None,
364 component_ids: None,
365 page: 0,
366 page_size: ComponentTvlRequestBody::MAX_PAGE_SIZE_COMPRESSED,
367 }
368 }
369
370 pub fn with_protocol_system(mut self, protocol_system: impl Into<String>) -> Self {
371 self.protocol_system = Some(protocol_system.into());
372 self
373 }
374
375 pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
376 self.component_ids = Some(ids);
377 self
378 }
379
380 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
381 self.page = page;
382 self.page_size = page_size;
383 self
384 }
385}
386
387#[derive(Clone, PartialEq, Debug)]
389pub struct TracedEntryPointsParams {
390 chain: Chain,
391 protocol_system: String,
392 component_ids: Option<Vec<String>>,
393 page: i64,
394 page_size: i64,
395}
396
397impl TracedEntryPointsParams {
398 pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
399 Self {
400 chain,
401 protocol_system: protocol_system.into(),
402 component_ids: None,
403 page: 0,
404 page_size: TracedEntryPointRequestBody::MAX_PAGE_SIZE_COMPRESSED,
405 }
406 }
407
408 pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
409 self.component_ids = Some(ids);
410 self
411 }
412
413 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
414 self.page = page;
415 self.page_size = page_size;
416 self
417 }
418}
419
420#[derive(Clone, PartialEq, Debug)]
422pub struct ProtocolComponentsPaginatedParams {
423 chain: Chain,
424 protocol_system: String,
425 component_ids: Option<Vec<ComponentId>>,
426 tvl_gt: Option<f64>,
427 chunk_size: Option<usize>,
428 concurrency: usize,
429}
430
431impl ProtocolComponentsPaginatedParams {
432 pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
433 Self {
434 chain,
435 protocol_system: protocol_system.into(),
436 component_ids: None,
437 tvl_gt: None,
438 chunk_size: None,
439 concurrency,
440 }
441 }
442
443 pub fn with_component_ids(mut self, ids: Vec<ComponentId>) -> Self {
444 self.component_ids = Some(ids);
445 self
446 }
447
448 pub fn with_tvl_gt(mut self, tvl_gt: f64) -> Self {
449 self.tvl_gt = Some(tvl_gt);
450 self
451 }
452
453 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
454 self.chunk_size = Some(chunk_size);
455 self
456 }
457}
458
459#[derive(Clone, PartialEq, Debug)]
461pub struct TracedEntryPointsPaginatedParams {
462 chain: Chain,
463 protocol_system: String,
464 component_ids: Vec<String>,
465 chunk_size: Option<usize>,
466 concurrency: usize,
467}
468
469impl TracedEntryPointsPaginatedParams {
470 pub fn new(
471 chain: Chain,
472 protocol_system: impl Into<String>,
473 component_ids: Vec<String>,
474 concurrency: usize,
475 ) -> Self {
476 Self {
477 chain,
478 protocol_system: protocol_system.into(),
479 component_ids,
480 chunk_size: None,
481 concurrency,
482 }
483 }
484
485 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
486 self.chunk_size = Some(chunk_size);
487 self
488 }
489}
490
491#[derive(Clone, PartialEq, Debug)]
493pub struct ProtocolStatesPaginatedParams {
494 chain: Chain,
495 protocol_system: String,
496 protocol_ids: Vec<String>,
497 include_balances: bool,
498 version: VersionParam,
499 chunk_size: Option<usize>,
500 concurrency: usize,
501}
502
503impl ProtocolStatesPaginatedParams {
504 pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
505 Self {
506 chain,
507 protocol_system: protocol_system.into(),
508 protocol_ids: Vec::new(),
509 include_balances: true,
510 version: VersionParam::default(),
511 chunk_size: None,
512 concurrency,
513 }
514 }
515
516 pub fn with_protocol_ids(mut self, ids: Vec<String>) -> Self {
517 self.protocol_ids = ids;
518 self
519 }
520
521 pub fn with_include_balances(mut self, include_balances: bool) -> Self {
522 self.include_balances = include_balances;
523 self
524 }
525
526 pub fn with_version(mut self, version: VersionParam) -> Self {
527 self.version = version;
528 self
529 }
530
531 pub fn with_block_number(mut self, block_number: u64) -> Self {
532 self.version = VersionParam::at_block(self.chain.into(), block_number);
533 self
534 }
535
536 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
537 self.chunk_size = Some(chunk_size);
538 self
539 }
540}
541
542#[derive(Clone, PartialEq, Debug)]
544pub struct AllTokensParams {
545 chain: Chain,
546 min_quality: Option<i32>,
547 traded_n_days_ago: Option<u64>,
548 chunk_size: Option<usize>,
549 concurrency: usize,
550}
551
552impl AllTokensParams {
553 pub fn new(chain: Chain, concurrency: usize) -> Self {
554 Self { chain, min_quality: None, traded_n_days_ago: None, chunk_size: None, concurrency }
555 }
556
557 pub fn with_min_quality(mut self, min_quality: i32) -> Self {
558 self.min_quality = Some(min_quality);
559 self
560 }
561
562 pub fn with_traded_n_days_ago(mut self, days: u64) -> Self {
563 self.traded_n_days_ago = Some(days);
564 self
565 }
566
567 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
568 self.chunk_size = Some(chunk_size);
569 self
570 }
571}
572
573#[derive(Clone, PartialEq, Debug)]
575pub struct ComponentTvlPaginatedParams {
576 chain: Chain,
577 protocol_system: Option<String>,
578 component_ids: Option<Vec<String>>,
579 chunk_size: Option<usize>,
580 concurrency: usize,
581}
582
583impl ComponentTvlPaginatedParams {
584 pub fn new(chain: Chain, concurrency: usize) -> Self {
585 Self { chain, protocol_system: None, component_ids: None, chunk_size: None, concurrency }
586 }
587
588 pub fn with_protocol_system(mut self, protocol_system: impl Into<String>) -> Self {
589 self.protocol_system = Some(protocol_system.into());
590 self
591 }
592
593 pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
594 self.component_ids = Some(ids);
595 self
596 }
597
598 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
599 self.chunk_size = Some(chunk_size);
600 self
601 }
602}
603
604#[derive(Clone, PartialEq, Debug)]
606pub struct ContractStatePaginatedParams {
607 chain: Chain,
608 protocol_system: String,
609 contract_ids: Vec<Bytes>,
610 version: VersionParam,
611 chunk_size: Option<usize>,
612 concurrency: usize,
613}
614
615impl ContractStatePaginatedParams {
616 pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
617 Self {
618 chain,
619 protocol_system: protocol_system.into(),
620 contract_ids: Vec::new(),
621 version: VersionParam::default(),
622 chunk_size: None,
623 concurrency,
624 }
625 }
626
627 pub fn with_contract_ids(mut self, ids: Vec<Bytes>) -> Self {
628 self.contract_ids = ids;
629 self
630 }
631
632 pub fn with_version(mut self, version: VersionParam) -> Self {
633 self.version = version;
634 self
635 }
636
637 pub fn with_block_number(mut self, block_number: u64) -> Self {
638 self.version = VersionParam::at_block(self.chain.into(), block_number);
639 self
640 }
641
642 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
643 self.chunk_size = Some(chunk_size);
644 self
645 }
646}
647
648#[derive(Clone, Debug, PartialEq)]
653pub struct SnapshotParameters<'a> {
654 pub chain: Chain,
656 pub protocol_system: &'a str,
658 pub components: &'a HashMap<ComponentId, ProtocolComponent>,
660 pub entrypoints: Option<&'a TracedEntryPoints>,
662 pub contract_ids: &'a [Bytes],
664 pub block_number: u64,
666 pub include_balances: bool,
668 pub include_tvl: bool,
670}
671
672impl<'a> SnapshotParameters<'a> {
673 pub fn new(
674 chain: Chain,
675 protocol_system: &'a str,
676 components: &'a HashMap<ComponentId, ProtocolComponent>,
677 contract_ids: &'a [Bytes],
678 block_number: u64,
679 ) -> Self {
680 Self {
681 chain,
682 protocol_system,
683 components,
684 entrypoints: None,
685 contract_ids,
686 block_number,
687 include_balances: true,
688 include_tvl: true,
689 }
690 }
691
692 pub fn include_balances(mut self, include_balances: bool) -> Self {
694 self.include_balances = include_balances;
695 self
696 }
697
698 pub fn include_tvl(mut self, include_tvl: bool) -> Self {
700 self.include_tvl = include_tvl;
701 self
702 }
703
704 pub fn entrypoints(mut self, entrypoints: &'a TracedEntryPoints) -> Self {
705 self.entrypoints = Some(entrypoints);
706 self
707 }
708}
709
710#[derive(Error, Debug)]
711pub enum RPCError {
712 #[error("Failed to parse URL: {0}. Error: {1}")]
714 UrlParsing(String, String),
715
716 #[error("Failed to format request: {0}")]
718 FormatRequest(String),
719
720 #[error("Unexpected HTTP client error: {0}")]
722 HttpClient(String, #[source] reqwest::Error),
723
724 #[error("Failed to parse response: {0}")]
726 ParseResponse(String),
727
728 #[error("Snapshot block is stale: {0}")]
730 StaleBlock(String),
731
732 #[error("Unknown extractor: {0}")]
734 UnknownExtractor(String),
735
736 #[error("Fatal error: {0}")]
738 Fatal(String),
739
740 #[error("Rate limited until {0:?}")]
741 RateLimited(Option<SystemTime>),
742
743 #[error("Server unreachable: {0}")]
744 ServerUnreachable(String),
745}
746
747impl RPCError {
748 fn from_parse_error(err: serde_json::Error, body: &str) -> Self {
758 if body.contains("version is older than") || body.contains("Could not find Block") {
759 RPCError::StaleBlock(body.to_string())
760 } else if body.starts_with("Unknown extractor:") {
761 RPCError::UnknownExtractor(body.to_string())
762 } else {
763 RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
764 }
765 }
766}
767
768#[cfg_attr(test, automock)]
769#[async_trait]
770pub trait RPCClient: Send + Sync {
771 fn compression(&self) -> bool;
773
774 async fn get_contract_state(
778 &self,
779 params: ContractStateParams,
780 ) -> Result<Page<Vec<Account>>, RPCError>;
781
782 async fn get_contract_state_paginated(
786 &self,
787 params: ContractStatePaginatedParams,
788 ) -> Result<Vec<Account>, RPCError> {
789 let semaphore = Arc::new(Semaphore::new(params.concurrency));
790
791 let mut sorted_ids = params.contract_ids;
793 sorted_ids.sort();
794
795 let chunk_size = params
796 .chunk_size
797 .unwrap_or(StateRequestBody::effective_max_page_size(self.compression()) as usize);
798
799 let mut tasks = Vec::new();
800 for chunk in sorted_ids.chunks(chunk_size) {
801 let sem = semaphore.clone();
802 let base_params =
803 ContractStateParams::new(params.chain, params.protocol_system.as_str())
804 .with_contract_ids(chunk.to_vec())
805 .with_version(params.version.clone())
806 .with_pagination(0, chunk_size as i64);
807 tasks.push(async move {
808 let _permit = sem
809 .acquire()
810 .await
811 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
812 self.get_contract_state(base_params)
813 .await
814 });
815 }
816
817 let pages = try_join_all(tasks).await?;
818
819 let accounts = pages
820 .into_iter()
821 .flat_map(|p| p.into_iter())
822 .collect();
823
824 Ok(accounts)
825 }
826
827 async fn get_protocol_components(
831 &self,
832 params: ProtocolComponentsParams,
833 ) -> Result<Page<Vec<ProtocolComponent>>, RPCError>;
834
835 async fn get_protocol_components_paginated(
839 &self,
840 params: ProtocolComponentsPaginatedParams,
841 ) -> Result<Vec<ProtocolComponent>, RPCError> {
842 let chain = params.chain;
843 let protocol_system = params.protocol_system;
844 let component_ids = params.component_ids;
845 let tvl_gt = params.tvl_gt;
846 let chunk_size = params.chunk_size;
847 let concurrency = params.concurrency;
848
849 let semaphore = Arc::new(Semaphore::new(concurrency));
850
851 let chunk_size = chunk_size.unwrap_or(
852 ProtocolComponentsRequestBody::effective_max_page_size(self.compression()) as usize,
853 );
854
855 match component_ids {
858 Some(ids) => {
859 let tasks: Vec<_> =
860 ids.chunks(chunk_size)
861 .enumerate()
862 .map(|(index, chunk)| {
863 let sem = semaphore.clone();
864 let mut base =
865 ProtocolComponentsParams::new(chain, protocol_system.as_str())
866 .with_component_ids(chunk.to_vec())
867 .with_pagination(index as i64, chunk_size as i64);
868 if let Some(tvl) = tvl_gt {
869 base = base.with_tvl_gt(tvl);
870 }
871 async move {
872 let _permit = sem.acquire().await.map_err(|_| {
873 RPCError::Fatal("Semaphore dropped".to_string())
874 })?;
875 self.get_protocol_components(base).await
876 }
877 })
878 .collect();
879
880 try_join_all(tasks)
881 .await
882 .map(|pages| pages.into_iter().flatten().collect())
883 }
884 None => {
885 let mut base_params =
888 ProtocolComponentsParams::new(chain, protocol_system.as_str())
889 .with_pagination(0, chunk_size as i64);
890 if let Some(tvl) = tvl_gt {
891 base_params = base_params.with_tvl_gt(tvl);
892 }
893
894 let first_page = self
895 .get_protocol_components(base_params)
896 .await?;
897
898 let total_items = first_page.total();
899 let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
900
901 let mut all: Vec<ProtocolComponent> = first_page.into_data();
902
903 let mut page = 1;
904 while page < total_pages {
905 let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
906
907 let tasks: Vec<_> = (0..requests_in_this_iteration)
908 .map(|iter| {
909 let sem = semaphore.clone();
910 let mut p =
911 ProtocolComponentsParams::new(chain, protocol_system.as_str())
912 .with_pagination(page + iter, chunk_size as i64);
913 if let Some(tvl) = tvl_gt {
914 p = p.with_tvl_gt(tvl);
915 }
916 async move {
917 let _permit = sem.acquire().await.map_err(|_| {
918 RPCError::Fatal("Semaphore dropped".to_string())
919 })?;
920 self.get_protocol_components(p).await
921 }
922 })
923 .collect();
924
925 let responses = try_join_all(tasks).await?;
926
927 for resp in responses {
928 all.extend(resp);
929 }
930
931 page += requests_in_this_iteration;
932 }
933 Ok(all)
934 }
935 }
936 }
937
938 async fn get_protocol_states(
942 &self,
943 params: ProtocolStatesParams,
944 ) -> Result<Page<Vec<ProtocolComponentState>>, RPCError>;
945
946 async fn get_protocol_states_paginated(
950 &self,
951 params: ProtocolStatesPaginatedParams,
952 ) -> Result<Vec<ProtocolComponentState>, RPCError> {
953 let semaphore = Arc::new(Semaphore::new(params.concurrency));
954
955 let chunk_size =
956 params
957 .chunk_size
958 .unwrap_or(
959 ProtocolStateRequestBody::effective_max_page_size(self.compression()) as usize
960 );
961
962 let tasks: Vec<_> = params
963 .protocol_ids
964 .chunks(chunk_size)
965 .map(|c| {
966 let sem = semaphore.clone();
967 let p = ProtocolStatesParams::new(params.chain, params.protocol_system.as_str())
968 .with_protocol_ids(c.to_vec())
969 .with_include_balances(params.include_balances)
970 .with_version(params.version.clone())
971 .with_pagination(0, chunk_size as i64);
972 async move {
973 let _permit = sem
974 .acquire()
975 .await
976 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
977 self.get_protocol_states(p).await
978 }
979 })
980 .collect();
981
982 try_join_all(tasks)
983 .await
984 .map(|pages| pages.into_iter().flatten().collect())
985 }
986
987 async fn get_tokens(&self, params: TokensParams) -> Result<Page<Vec<Token>>, RPCError>;
991
992 async fn get_all_tokens(&self, params: AllTokensParams) -> Result<Vec<Token>, RPCError> {
996 let chunk_size = params
997 .chunk_size
998 .unwrap_or(TokensRequestBody::effective_max_page_size(self.compression()) as usize);
999
1000 let semaphore = Arc::new(Semaphore::new(params.concurrency));
1001
1002 let page_size: i64 = chunk_size.try_into().map_err(|_| {
1003 RPCError::FormatRequest("Failed to convert chunk_size into i64".to_string())
1004 })?;
1005
1006 let mut base_params = TokensParams::new(params.chain).with_pagination(0, page_size);
1007 if let Some(q) = params.min_quality {
1008 base_params = base_params.with_min_quality(q);
1009 }
1010 if let Some(d) = params.traded_n_days_ago {
1011 base_params = base_params.with_traded_n_days_ago(d);
1012 }
1013
1014 let first_page = self.get_tokens(base_params).await?;
1015 let total_pages = (first_page.total() as f64 / chunk_size as f64).ceil() as i64;
1016
1017 let mut all_tokens: Vec<Token> = first_page.into_data();
1018
1019 if total_pages <= 1 {
1020 return Ok(all_tokens);
1021 }
1022
1023 let tasks: Vec<_> = (1..total_pages)
1024 .map(|page| {
1025 let sem = semaphore.clone();
1026 let mut p = TokensParams::new(params.chain).with_pagination(page, page_size);
1027 if let Some(q) = params.min_quality {
1028 p = p.with_min_quality(q);
1029 }
1030 if let Some(d) = params.traded_n_days_ago {
1031 p = p.with_traded_n_days_ago(d);
1032 }
1033 async move {
1034 let _permit = sem
1035 .acquire()
1036 .await
1037 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
1038 self.get_tokens(p).await
1039 }
1040 })
1041 .collect();
1042
1043 let pages = try_join_all(tasks).await?;
1044 for page in pages {
1045 all_tokens.extend(page);
1046 }
1047
1048 Ok(all_tokens)
1049 }
1050
1051 async fn get_protocol_systems(
1053 &self,
1054 params: ProtocolSystemsParams,
1055 ) -> Result<Page<ProtocolSystems>, RPCError>;
1056
1057 async fn get_component_tvl(
1061 &self,
1062 params: ComponentTvlParams,
1063 ) -> Result<Page<HashMap<String, f64>>, RPCError>;
1064
1065 async fn get_component_tvl_paginated(
1069 &self,
1070 params: ComponentTvlPaginatedParams,
1071 ) -> Result<HashMap<String, f64>, RPCError> {
1072 let semaphore = Arc::new(Semaphore::new(params.concurrency));
1073
1074 let chunk_size =
1075 params
1076 .chunk_size
1077 .unwrap_or(
1078 ComponentTvlRequestBody::effective_max_page_size(self.compression()) as usize
1079 );
1080
1081 match params.component_ids {
1082 Some(ids) => {
1083 let tasks: Vec<_> =
1084 ids.chunks(chunk_size)
1085 .enumerate()
1086 .map(|(index, chunk)| {
1087 let sem = semaphore.clone();
1088 let mut p = ComponentTvlParams::new(params.chain)
1089 .with_component_ids(chunk.to_vec())
1090 .with_pagination(index as i64, chunk_size as i64);
1091 if let Some(ref ps) = params.protocol_system {
1092 p = p.with_protocol_system(ps.as_str());
1093 }
1094 async move {
1095 let _permit = sem.acquire().await.map_err(|_| {
1096 RPCError::Fatal("Semaphore dropped".to_string())
1097 })?;
1098 self.get_component_tvl(p).await
1099 }
1100 })
1101 .collect();
1102
1103 let pages = try_join_all(tasks).await?;
1104
1105 let mut merged_tvl = HashMap::new();
1106 for page in pages {
1107 for (key, value) in page {
1108 *merged_tvl.entry(key).or_insert(0.0) = value;
1109 }
1110 }
1111
1112 Ok(merged_tvl)
1113 }
1114 None => {
1115 let mut base =
1116 ComponentTvlParams::new(params.chain).with_pagination(0, chunk_size as i64);
1117 if let Some(ref ps) = params.protocol_system {
1118 base = base.with_protocol_system(ps.as_str());
1119 }
1120
1121 let first_page = self.get_component_tvl(base).await?;
1122 let total_items = first_page.total();
1123 let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
1124
1125 let mut merged_tvl: HashMap<String, f64> = first_page.into_data();
1126
1127 let mut page = 1;
1128 while page < total_pages {
1129 let requests_in_this_iteration =
1130 (total_pages - page).min(params.concurrency as i64);
1131
1132 let tasks: Vec<_> = (0..requests_in_this_iteration)
1133 .map(|i| {
1134 let sem = semaphore.clone();
1135 let mut p = ComponentTvlParams::new(params.chain)
1136 .with_pagination(page + i, chunk_size as i64);
1137 if let Some(ref ps) = params.protocol_system {
1138 p = p.with_protocol_system(ps.as_str());
1139 }
1140 async move {
1141 let _permit = sem.acquire().await.map_err(|_| {
1142 RPCError::Fatal("Semaphore dropped".to_string())
1143 })?;
1144 self.get_component_tvl(p).await
1145 }
1146 })
1147 .collect();
1148
1149 let responses = try_join_all(tasks).await?;
1150
1151 for resp in responses {
1152 for (key, value) in resp {
1153 *merged_tvl.entry(key).or_insert(0.0) = value;
1154 }
1155 }
1156
1157 page += requests_in_this_iteration;
1158 }
1159
1160 Ok(merged_tvl)
1161 }
1162 }
1163 }
1164
1165 async fn get_traced_entry_points(
1169 &self,
1170 params: TracedEntryPointsParams,
1171 ) -> Result<Page<TracedEntryPoints>, RPCError>;
1172
1173 async fn get_traced_entry_points_paginated(
1177 &self,
1178 params: TracedEntryPointsPaginatedParams,
1179 ) -> Result<TracedEntryPoints, RPCError> {
1180 let chain = params.chain;
1181 let protocol_system = params.protocol_system;
1182 let component_ids = params.component_ids;
1183 let chunk_size = params.chunk_size;
1184 let concurrency = params.concurrency;
1185
1186 let semaphore = Arc::new(Semaphore::new(concurrency));
1187
1188 let chunk_size = chunk_size.unwrap_or(
1189 TracedEntryPointRequestBody::effective_max_page_size(self.compression()) as usize,
1190 );
1191
1192 let tasks: Vec<_> = component_ids
1193 .chunks(chunk_size)
1194 .map(|c| {
1195 let sem = semaphore.clone();
1196 let params = TracedEntryPointsParams::new(chain, protocol_system.as_str())
1197 .with_component_ids(c.to_vec())
1198 .with_pagination(0, chunk_size as i64);
1199 async move {
1200 let _permit = sem
1201 .acquire()
1202 .await
1203 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
1204 self.get_traced_entry_points(params)
1205 .await
1206 }
1207 })
1208 .collect();
1209
1210 try_join_all(tasks)
1211 .await
1212 .map(|pages| pages.into_iter().flatten().collect())
1213 }
1214
1215 #[allow(clippy::extra_unused_lifetimes)]
1218 async fn get_snapshots<'a>(
1219 &self,
1220 request: &SnapshotParameters<'a>,
1221 chunk_size: Option<usize>,
1222 concurrency: usize,
1223 ) -> Result<Snapshot, RPCError>;
1224}
1225
1226#[derive(Debug, Clone)]
1228pub struct HttpRPCClientOptions {
1229 pub auth_key: Option<String>,
1231 pub compression: bool,
1234}
1235
1236impl Default for HttpRPCClientOptions {
1237 fn default() -> Self {
1238 Self::new()
1239 }
1240}
1241
1242impl HttpRPCClientOptions {
1243 pub fn new() -> Self {
1245 Self { auth_key: None, compression: true }
1246 }
1247
1248 pub fn with_auth_key(mut self, auth_key: Option<String>) -> Self {
1250 self.auth_key = auth_key;
1251 self
1252 }
1253
1254 pub fn with_compression(mut self, compression: bool) -> Self {
1256 self.compression = compression;
1257 self
1258 }
1259}
1260
1261#[derive(Debug, Clone)]
1262pub struct HttpRPCClient {
1263 http_client: Client,
1264 url: Url,
1265 retry_after: Arc<RwLock<Option<SystemTime>>>,
1266 backoff_policy: ExponentialBackoff,
1267 server_restart_duration: Duration,
1268 compression: bool,
1269}
1270
1271impl HttpRPCClient {
1272 pub fn new(base_uri: &str, options: HttpRPCClientOptions) -> Result<Self, RPCError> {
1273 let uri = base_uri
1274 .parse::<Url>()
1275 .map_err(|e| RPCError::UrlParsing(base_uri.to_string(), e.to_string()))?;
1276
1277 let mut headers = header::HeaderMap::new();
1279 headers.insert(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"));
1280 let user_agent = format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION"));
1281 headers.insert(
1282 header::USER_AGENT,
1283 header::HeaderValue::from_str(&user_agent)
1284 .map_err(|e| RPCError::FormatRequest(format!("Invalid user agent format: {e}")))?,
1285 );
1286
1287 if let Some(key) = options.auth_key.as_deref() {
1289 let mut auth_value = header::HeaderValue::from_str(key).map_err(|e| {
1290 RPCError::FormatRequest(format!("Invalid authorization key format: {e}"))
1291 })?;
1292 auth_value.set_sensitive(true);
1293 headers.insert(header::AUTHORIZATION, auth_value);
1294 }
1295
1296 let mut client_builder = ClientBuilder::new()
1297 .default_headers(headers)
1298 .http2_prior_knowledge();
1299
1300 if !options.compression {
1302 client_builder = client_builder.no_zstd();
1303 }
1304
1305 let client = client_builder
1306 .build()
1307 .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
1308
1309 Ok(Self {
1310 http_client: client,
1311 url: uri,
1312 retry_after: Arc::new(RwLock::new(None)),
1313 backoff_policy: ExponentialBackoffBuilder::new()
1314 .with_initial_interval(Duration::from_millis(250))
1315 .with_multiplier(1.75)
1317 .with_max_interval(Duration::from_secs(30))
1319 .with_max_elapsed_time(Some(Duration::from_secs(125)))
1321 .build(),
1322 server_restart_duration: Duration::from_secs(120),
1323 compression: options.compression,
1324 })
1325 }
1326
1327 #[cfg(test)]
1328 pub fn with_test_backoff_policy(mut self) -> Self {
1329 self.backoff_policy = ExponentialBackoffBuilder::new()
1331 .with_initial_interval(Duration::from_millis(1))
1332 .with_multiplier(1.1)
1333 .with_max_interval(Duration::from_millis(5))
1334 .with_max_elapsed_time(Some(Duration::from_millis(50)))
1335 .build();
1336 self.server_restart_duration = Duration::from_millis(50);
1337 self
1338 }
1339
1340 async fn error_for_response(
1346 &self,
1347 response: reqwest::Response,
1348 ) -> Result<reqwest::Response, RPCError> {
1349 match response.status() {
1350 StatusCode::TOO_MANY_REQUESTS => {
1351 let retry_after_raw = response
1352 .headers()
1353 .get(reqwest::header::RETRY_AFTER)
1354 .and_then(|h| h.to_str().ok())
1355 .and_then(parse_retry_value);
1356
1357 let reason = response
1358 .text()
1359 .await
1360 .unwrap_or_default();
1361 warn!(reason, retry_after = ?retry_after_raw, "Rate limited by server");
1362
1363 Err(RPCError::RateLimited(retry_after_raw))
1364 }
1365 StatusCode::BAD_GATEWAY |
1366 StatusCode::SERVICE_UNAVAILABLE |
1367 StatusCode::GATEWAY_TIMEOUT => Err(RPCError::ServerUnreachable(
1368 response
1369 .text()
1370 .await
1371 .unwrap_or_else(|_| "Server Unreachable".to_string()),
1372 )),
1373 _ => Ok(response),
1374 }
1375 }
1376
1377 async fn handle_error_for_backoff(&self, e: RPCError) -> backoff::Error<RPCError> {
1383 match e {
1384 RPCError::ServerUnreachable(_) => {
1385 backoff::Error::retry_after(e, self.server_restart_duration)
1386 }
1387 RPCError::RateLimited(Some(until)) => {
1388 let mut retry_after_guard = self.retry_after.write().await;
1389 *retry_after_guard = Some(
1390 retry_after_guard
1391 .unwrap_or(until)
1392 .max(until),
1393 );
1394
1395 if let Ok(duration) = until.duration_since(SystemTime::now()) {
1396 backoff::Error::retry_after(e, duration)
1397 } else {
1398 e.into()
1399 }
1400 }
1401 RPCError::RateLimited(None) => e.into(),
1402 _ => backoff::Error::permanent(e),
1403 }
1404 }
1405
1406 async fn wait_until_retry_after(&self) {
1411 if let Some(&until) = self.retry_after.read().await.as_ref() {
1412 let now = SystemTime::now();
1413 if until > now {
1414 if let Ok(duration) = until.duration_since(now) {
1415 sleep(duration).await
1416 }
1417 }
1418 }
1419 }
1420
1421 async fn make_post_request<T: Serialize + ?Sized>(
1426 &self,
1427 request: &T,
1428 uri: &String,
1429 ) -> Result<Response, RPCError> {
1430 self.wait_until_retry_after().await;
1431 let response = backoff::future::retry(self.backoff_policy.clone(), || async {
1432 let server_response = self
1433 .http_client
1434 .post(uri)
1435 .json(request)
1436 .send()
1437 .await
1438 .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
1439
1440 match self
1441 .error_for_response(server_response)
1442 .await
1443 {
1444 Ok(response) => Ok(response),
1445 Err(e) => Err(self.handle_error_for_backoff(e).await),
1446 }
1447 })
1448 .await?;
1449 Ok(response)
1450 }
1451}
1452
1453fn parse_retry_value(val: &str) -> Option<SystemTime> {
1454 if let Ok(secs) = val.parse::<u64>() {
1455 return Some(SystemTime::now() + Duration::from_secs(secs));
1456 }
1457 if let Ok(date) = OffsetDateTime::parse(val, &Rfc2822) {
1458 return Some(date.into());
1459 }
1460 None
1461}
1462
1463#[async_trait]
1464impl RPCClient for HttpRPCClient {
1465 fn compression(&self) -> bool {
1466 self.compression
1467 }
1468
1469 #[instrument(skip(self))]
1470 async fn get_contract_state(
1471 &self,
1472 params: ContractStateParams,
1473 ) -> Result<Page<Vec<Account>>, RPCError> {
1474 if params
1475 .contract_ids
1476 .as_ref()
1477 .is_none_or(|ids| ids.is_empty())
1478 {
1479 warn!("No contract ids specified in request.");
1480 }
1481
1482 let request = StateRequestBody {
1483 contract_ids: params.contract_ids,
1484 protocol_system: params.protocol_system,
1485 chain: params.chain.into(),
1486 version: params.version,
1487 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1488 };
1489
1490 let uri = format!(
1491 "{}/{}/contract_state",
1492 self.url
1493 .to_string()
1494 .trim_end_matches('/'),
1495 TYCHO_SERVER_VERSION
1496 );
1497 debug!(%uri, "Sending contract_state request to Tycho server");
1498 trace!(?request, "Sending request to Tycho server");
1499 let response = self
1500 .make_post_request(&request, &uri)
1501 .await?;
1502 trace!(?response, "Received response from Tycho server");
1503
1504 let body = response
1505 .text()
1506 .await
1507 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1508 if body.is_empty() {
1509 return Ok(Page::new(vec![], 0, 0, 0));
1511 }
1512
1513 let dto_response = serde_json::from_str::<StateRequestResponse>(&body)
1514 .map_err(|err| RPCError::from_parse_error(err, &body))?;
1515 trace!(?dto_response, "Received contract_state response from Tycho server");
1516
1517 let data: Vec<Account> = dto_response
1518 .accounts
1519 .into_iter()
1520 .map(Account::from)
1521 .collect();
1522 Ok(Page::new(
1523 data,
1524 dto_response.pagination.total,
1525 dto_response.pagination.page,
1526 dto_response.pagination.page_size,
1527 ))
1528 }
1529
1530 async fn get_protocol_components(
1531 &self,
1532 params: ProtocolComponentsParams,
1533 ) -> Result<Page<Vec<ProtocolComponent>>, RPCError> {
1534 let request = ProtocolComponentsRequestBody {
1535 protocol_system: params.protocol_system,
1536 component_ids: params.component_ids,
1537 tvl_gt: params.tvl_gt,
1538 chain: params.chain.into(),
1539 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1540 };
1541
1542 let uri = format!(
1543 "{}/{}/protocol_components",
1544 self.url
1545 .to_string()
1546 .trim_end_matches('/'),
1547 TYCHO_SERVER_VERSION,
1548 );
1549 debug!(%uri, "Sending protocol_components request to Tycho server");
1550 trace!(?request, "Sending request to Tycho server");
1551
1552 let response = self
1553 .make_post_request(&request, &uri)
1554 .await?;
1555
1556 trace!(?response, "Received response from Tycho server");
1557
1558 let body = response
1559 .text()
1560 .await
1561 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1562 let dto_response = serde_json::from_str::<ProtocolComponentRequestResponse>(&body)
1563 .map_err(|err| RPCError::from_parse_error(err, &body))?;
1564 trace!(?dto_response, "Received protocol_components response from Tycho server");
1565
1566 let data: Vec<ProtocolComponent> = dto_response
1567 .protocol_components
1568 .into_iter()
1569 .map(ProtocolComponent::from)
1570 .collect();
1571 Ok(Page::new(
1572 data,
1573 dto_response.pagination.total,
1574 dto_response.pagination.page,
1575 dto_response.pagination.page_size,
1576 ))
1577 }
1578
1579 async fn get_protocol_states(
1580 &self,
1581 params: ProtocolStatesParams,
1582 ) -> Result<Page<Vec<ProtocolComponentState>>, RPCError> {
1583 if params
1584 .protocol_ids
1585 .as_ref()
1586 .is_none_or(|ids| ids.is_empty())
1587 {
1588 warn!("No protocol ids specified in request.");
1589 }
1590
1591 let request = ProtocolStateRequestBody {
1592 protocol_ids: params.protocol_ids,
1593 protocol_system: params.protocol_system,
1594 chain: params.chain.into(),
1595 include_balances: params.include_balances,
1596 version: params.version,
1597 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1598 };
1599
1600 let uri = format!(
1601 "{}/{}/protocol_state",
1602 self.url
1603 .to_string()
1604 .trim_end_matches('/'),
1605 TYCHO_SERVER_VERSION
1606 );
1607 debug!(%uri, "Sending protocol_states request to Tycho server");
1608 trace!(?request, "Sending request to Tycho server");
1609
1610 let response = self
1611 .make_post_request(&request, &uri)
1612 .await?;
1613 trace!(?response, "Received response from Tycho server");
1614
1615 let body = response
1616 .text()
1617 .await
1618 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1619
1620 if body.is_empty() {
1621 return Ok(Page::new(vec![], 0, 0, 0));
1623 }
1624
1625 let dto_response = serde_json::from_str::<ProtocolStateRequestResponse>(&body)
1626 .map_err(|err| RPCError::from_parse_error(err, &body))?;
1627 trace!(?dto_response, "Received protocol_states response from Tycho server");
1628
1629 let data: Vec<ProtocolComponentState> = dto_response
1630 .states
1631 .into_iter()
1632 .map(ProtocolComponentState::from)
1633 .collect();
1634 Ok(Page::new(
1635 data,
1636 dto_response.pagination.total,
1637 dto_response.pagination.page,
1638 dto_response.pagination.page_size,
1639 ))
1640 }
1641
1642 async fn get_tokens(&self, params: TokensParams) -> Result<Page<Vec<Token>>, RPCError> {
1643 let request = TokensRequestBody {
1644 token_addresses: None,
1645 min_quality: params.min_quality,
1646 traded_n_days_ago: params.traded_n_days_ago,
1647 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1648 chain: params.chain.into(),
1649 };
1650
1651 let uri = format!(
1652 "{}/{}/tokens",
1653 self.url
1654 .to_string()
1655 .trim_end_matches('/'),
1656 TYCHO_SERVER_VERSION
1657 );
1658 debug!(%uri, "Sending tokens request to Tycho server");
1659
1660 let response = self
1661 .make_post_request(&request, &uri)
1662 .await?;
1663
1664 let body = response
1665 .text()
1666 .await
1667 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1668 let dto_response = serde_json::from_str::<TokensRequestResponse>(&body)
1669 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1670
1671 let data: Vec<Token> = dto_response
1672 .tokens
1673 .into_iter()
1674 .map(Token::from)
1675 .collect();
1676 Ok(Page::new(
1677 data,
1678 dto_response.pagination.total,
1679 dto_response.pagination.page,
1680 dto_response.pagination.page_size,
1681 ))
1682 }
1683
1684 async fn get_protocol_systems(
1685 &self,
1686 params: ProtocolSystemsParams,
1687 ) -> Result<Page<ProtocolSystems>, RPCError> {
1688 let request = ProtocolSystemsRequestBody {
1689 chain: params.chain.into(),
1690 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1691 };
1692
1693 let uri = format!(
1694 "{}/{}/protocol_systems",
1695 self.url
1696 .to_string()
1697 .trim_end_matches('/'),
1698 TYCHO_SERVER_VERSION
1699 );
1700 debug!(%uri, "Sending protocol_systems request to Tycho server");
1701 trace!(?request, "Sending request to Tycho server");
1702 let response = self
1703 .make_post_request(&request, &uri)
1704 .await?;
1705 trace!(?response, "Received response from Tycho server");
1706 let body = response
1707 .text()
1708 .await
1709 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1710 let dto = serde_json::from_str::<ProtocolSystemsRequestResponse>(&body)
1711 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1712 trace!(?dto, "Received protocol_systems response from Tycho server");
1713 Ok(Page::new(
1714 ProtocolSystems::new(dto.protocol_systems, dto.dci_protocols),
1715 dto.pagination.total,
1716 dto.pagination.page,
1717 dto.pagination.page_size,
1718 ))
1719 }
1720
1721 async fn get_component_tvl(
1722 &self,
1723 params: ComponentTvlParams,
1724 ) -> Result<Page<HashMap<String, f64>>, RPCError> {
1725 let request = ComponentTvlRequestBody {
1726 chain: params.chain.into(),
1727 protocol_system: params.protocol_system,
1728 component_ids: params.component_ids,
1729 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1730 };
1731
1732 let uri = format!(
1733 "{}/{}/component_tvl",
1734 self.url
1735 .to_string()
1736 .trim_end_matches('/'),
1737 TYCHO_SERVER_VERSION
1738 );
1739 debug!(%uri, "Sending get_component_tvl request to Tycho server");
1740 trace!(?request, "Sending request to Tycho server");
1741 let response = self
1742 .make_post_request(&request, &uri)
1743 .await?;
1744 trace!(?response, "Received response from Tycho server");
1745 let body = response
1746 .text()
1747 .await
1748 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1749 let dto_response =
1750 serde_json::from_str::<ComponentTvlRequestResponse>(&body).map_err(|err| {
1751 error!("Failed to parse component_tvl response: {:?}", &body);
1752 RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1753 })?;
1754 trace!(?dto_response, "Received component_tvl response from Tycho server");
1755 Ok(Page::new(
1756 dto_response.tvl,
1757 dto_response.pagination.total,
1758 dto_response.pagination.page,
1759 dto_response.pagination.page_size,
1760 ))
1761 }
1762
1763 async fn get_traced_entry_points(
1764 &self,
1765 params: TracedEntryPointsParams,
1766 ) -> Result<Page<TracedEntryPoints>, RPCError> {
1767 let request = TracedEntryPointRequestBody {
1768 chain: params.chain.into(),
1769 protocol_system: params.protocol_system,
1770 component_ids: params.component_ids,
1771 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1772 };
1773
1774 let uri = format!(
1775 "{}/{TYCHO_SERVER_VERSION}/traced_entry_points",
1776 self.url
1777 .to_string()
1778 .trim_end_matches('/')
1779 );
1780 debug!(%uri, "Sending traced_entry_points request to Tycho server");
1781 trace!(?request, "Sending request to Tycho server");
1782
1783 let response = self
1784 .make_post_request(&request, &uri)
1785 .await?;
1786
1787 trace!(?response, "Received response from Tycho server");
1788
1789 let body = response
1790 .text()
1791 .await
1792 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1793 let dto_response =
1794 serde_json::from_str::<TracedEntryPointRequestResponse>(&body).map_err(|err| {
1795 error!("Failed to parse traced_entry_points response: {:?}", &body);
1796 RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1797 })?;
1798 trace!(?dto_response, "Received traced_entry_points response from Tycho server");
1799 let data: TracedEntryPoints = dto_response
1800 .traced_entry_points
1801 .into_iter()
1802 .map(|(k, v)| {
1803 (
1804 k,
1805 v.into_iter()
1806 .map(|(ep, tr)| {
1807 (EntryPointWithTracingParams::from(ep), TracingResult::from(tr))
1808 })
1809 .collect(),
1810 )
1811 })
1812 .collect();
1813 Ok(Page::new(
1814 data,
1815 dto_response.pagination.total,
1816 dto_response.pagination.page,
1817 dto_response.pagination.page_size,
1818 ))
1819 }
1820
1821 async fn get_snapshots<'a>(
1822 &self,
1823 request: &SnapshotParameters<'a>,
1824 chunk_size: Option<usize>,
1825 concurrency: usize,
1826 ) -> Result<Snapshot, RPCError> {
1827 let component_ids: Vec<_> = request
1828 .components
1829 .keys()
1830 .cloned()
1831 .collect();
1832
1833 let component_tvl = if request.include_tvl && !component_ids.is_empty() {
1834 self.get_component_tvl_paginated(
1835 ComponentTvlPaginatedParams::new(request.chain, concurrency)
1836 .with_component_ids(component_ids.clone()),
1837 )
1838 .await?
1839 } else {
1840 HashMap::new()
1841 };
1842
1843 let version = VersionParam::at_block(request.chain.into(), request.block_number);
1844
1845 let mut protocol_states = if !component_ids.is_empty() {
1846 self.get_protocol_states_paginated(
1847 ProtocolStatesPaginatedParams::new(
1848 request.chain,
1849 request.protocol_system,
1850 concurrency,
1851 )
1852 .with_protocol_ids(component_ids.clone())
1853 .with_include_balances(request.include_balances)
1854 .with_version(version.clone()),
1855 )
1856 .await?
1857 .into_iter()
1858 .map(|state| (state.component_id.clone(), state))
1859 .collect()
1860 } else {
1861 HashMap::new()
1862 };
1863
1864 let states = request
1866 .components
1867 .values()
1868 .filter_map(|component| {
1869 if let Some(state) = protocol_states.remove(&component.id) {
1870 Some((
1871 component.id.clone(),
1872 ComponentWithState {
1873 state,
1874 component: component.clone(),
1875 component_tvl: component_tvl
1876 .get(&component.id)
1877 .cloned(),
1878 entrypoints: request
1879 .entrypoints
1880 .as_ref()
1881 .and_then(|map| map.get(&component.id))
1882 .cloned()
1883 .unwrap_or_default(),
1884 },
1885 ))
1886 } else if component_ids.contains(&component.id) {
1887 let component_id = &component.id;
1889 error!(?component_id, "Missing state for native component!");
1890 None
1891 } else {
1892 None
1893 }
1894 })
1895 .collect();
1896
1897 let vm_storage = if !request.contract_ids.is_empty() {
1898 let mut cp_params = ContractStatePaginatedParams::new(
1899 request.chain,
1900 request.protocol_system,
1901 concurrency,
1902 )
1903 .with_contract_ids(request.contract_ids.to_vec())
1904 .with_version(version.clone());
1905 if let Some(cs) = chunk_size {
1906 cp_params = cp_params.with_chunk_size(cs);
1907 }
1908 let contract_states = self
1909 .get_contract_state_paginated(cp_params)
1910 .await?
1911 .into_iter()
1912 .map(|acc| (acc.address.clone(), acc))
1913 .collect::<HashMap<_, _>>();
1914
1915 trace!(states=?&contract_states, "Retrieved ContractState");
1916
1917 let contract_address_to_components = request
1918 .components
1919 .iter()
1920 .filter_map(|(id, comp)| {
1921 if component_ids.contains(id) {
1922 Some(
1923 comp.contract_addresses
1924 .iter()
1925 .map(|address| (address.clone(), comp.id.clone())),
1926 )
1927 } else {
1928 None
1929 }
1930 })
1931 .flatten()
1932 .fold(HashMap::<Bytes, Vec<String>>::new(), |mut acc, (addr, c_id)| {
1933 acc.entry(addr).or_default().push(c_id);
1934 acc
1935 });
1936
1937 request
1938 .contract_ids
1939 .iter()
1940 .filter_map(|address| {
1941 if let Some(state) = contract_states.get(address) {
1942 Some((address.clone(), state.clone()))
1943 } else if let Some(ids) = contract_address_to_components.get(address) {
1944 error!(
1946 ?address,
1947 ?ids,
1948 "Component with lacking contract storage encountered!"
1949 );
1950 None
1951 } else {
1952 None
1953 }
1954 })
1955 .collect()
1956 } else {
1957 HashMap::new()
1958 };
1959
1960 Ok(Snapshot { states, vm_storage })
1961 }
1962}
1963
1964#[cfg(test)]
1965mod tests {
1966 use std::{
1967 collections::{HashMap, HashSet},
1968 str::FromStr,
1969 };
1970
1971 use mockito::Server;
1972 use rstest::rstest;
1973 use tycho_common::models::blockchain::AddressStorageLocation;
1974
1975 use super::*;
1976
1977 impl MockRPCClient {
1980 #[allow(clippy::too_many_arguments)]
1981 async fn test_get_protocol_states_paginated<T>(
1982 &self,
1983 chain: Chain,
1984 ids: &[T],
1985 protocol_system: &str,
1986 include_balances: bool,
1987 block_number: Option<u64>,
1988 chunk_size: usize,
1989 _concurrency: usize,
1990 ) -> Vec<(Chain, Vec<String>, String, bool, Option<u64>, PaginationParams)>
1991 where
1992 T: AsRef<str> + Clone + Send + Sync + 'static,
1993 {
1994 ids.chunks(chunk_size)
1995 .map(|chunk| {
1996 (
1997 chain,
1998 chunk
1999 .iter()
2000 .map(|id| id.as_ref().to_string())
2001 .collect(),
2002 protocol_system.to_string(),
2003 include_balances,
2004 block_number,
2005 PaginationParams { page: 0, page_size: chunk_size as i64 },
2006 )
2007 })
2008 .collect()
2009 }
2010 }
2011
2012 const GET_CONTRACT_STATE_RESP: &str = r#"
2013 {
2014 "accounts": [
2015 {
2016 "chain": "ethereum",
2017 "address": "0x0000000000000000000000000000000000000000",
2018 "title": "",
2019 "slots": {},
2020 "native_balance": "0x01f4",
2021 "token_balances": {},
2022 "code": "0x00",
2023 "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
2024 "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2025 "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2026 "creation_tx": null
2027 }
2028 ],
2029 "pagination": {
2030 "page": 0,
2031 "page_size": 20,
2032 "total": 10
2033 }
2034 }
2035 "#;
2036
2037 #[rstest]
2038 #[case::string_input(vec![
2039 "id1".to_string(),
2040 "id2".to_string()
2041 ])]
2042 #[tokio::test]
2043 async fn test_get_protocol_states_paginated<T>(#[case] ids: Vec<T>)
2044 where
2045 T: AsRef<str> + Clone + Send + Sync + 'static,
2046 {
2047 let mock_client = MockRPCClient::new();
2048
2049 let request_args = mock_client
2050 .test_get_protocol_states_paginated(
2051 Chain::Ethereum,
2052 &ids,
2053 "test_system",
2054 true,
2055 None,
2056 2,
2057 2,
2058 )
2059 .await;
2060
2061 assert_eq!(request_args.len(), 1);
2063 assert_eq!(request_args[0].1.len(), 2);
2064 }
2065
2066 #[tokio::test]
2067 async fn test_get_contract_state() {
2068 let mut server = Server::new_async().await;
2069 let server_resp = GET_CONTRACT_STATE_RESP;
2070 serde_json::from_str::<StateRequestResponse>(server_resp).expect("deserialize");
2072
2073 let mocked_server = server
2074 .mock("POST", "/v1/contract_state")
2075 .expect(1)
2076 .with_body(server_resp)
2077 .create_async()
2078 .await;
2079
2080 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2081 .expect("create client");
2082
2083 let accounts = client
2084 .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
2085 .await
2086 .expect("get state");
2087
2088 mocked_server.assert();
2089 assert_eq!(accounts.data().len(), 1);
2090 assert_eq!(accounts.data()[0].slots, HashMap::new());
2091 assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
2092 assert_eq!(accounts.data()[0].code, [0].to_vec());
2093 assert_eq!(
2094 accounts.data()[0].code_hash,
2095 hex::decode("5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e")
2096 .unwrap()
2097 );
2098 }
2099
2100 #[tokio::test]
2101 async fn test_get_protocol_components() {
2102 let mut server = Server::new_async().await;
2103 let server_resp = r#"
2104 {
2105 "protocol_components": [
2106 {
2107 "id": "State1",
2108 "protocol_system": "ambient",
2109 "protocol_type_name": "Pool",
2110 "chain": "ethereum",
2111 "tokens": [
2112 "0x0000000000000000000000000000000000000000",
2113 "0x0000000000000000000000000000000000000001"
2114 ],
2115 "contract_ids": [
2116 "0x0000000000000000000000000000000000000000"
2117 ],
2118 "static_attributes": {
2119 "attribute_1": "0x00000000000003e8"
2120 },
2121 "change": "Creation",
2122 "creation_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2123 "created_at": "2022-01-01T00:00:00"
2124 }
2125 ],
2126 "pagination": {
2127 "page": 0,
2128 "page_size": 20,
2129 "total": 10
2130 }
2131 }
2132 "#;
2133 serde_json::from_str::<ProtocolComponentRequestResponse>(server_resp).expect("deserialize");
2135
2136 let mocked_server = server
2137 .mock("POST", "/v1/protocol_components")
2138 .expect(1)
2139 .with_body(server_resp)
2140 .create_async()
2141 .await;
2142
2143 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2144 .expect("create client");
2145
2146 let components = client
2147 .get_protocol_components(ProtocolComponentsParams::new(Chain::Ethereum, ""))
2148 .await
2149 .expect("get state");
2150
2151 mocked_server.assert();
2152 assert_eq!(components.data().len(), 1);
2153 assert_eq!(components.data()[0].id, "State1");
2154 assert_eq!(components.data()[0].protocol_system, "ambient");
2155 assert_eq!(components.data()[0].protocol_type_name, "Pool");
2156 assert_eq!(components.data()[0].tokens.len(), 2);
2157 let expected_attributes =
2158 [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
2159 .iter()
2160 .cloned()
2161 .collect::<HashMap<String, Bytes>>();
2162 assert_eq!(components.data()[0].static_attributes, expected_attributes);
2163 }
2164
2165 #[tokio::test]
2166 async fn test_get_protocol_states() {
2167 let mut server = Server::new_async().await;
2168 let server_resp = r#"
2169 {
2170 "states": [
2171 {
2172 "component_id": "State1",
2173 "attributes": {
2174 "attribute_1": "0x00000000000003e8"
2175 },
2176 "balances": {
2177 "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
2178 }
2179 }
2180 ],
2181 "pagination": {
2182 "page": 0,
2183 "page_size": 20,
2184 "total": 10
2185 }
2186 }
2187 "#;
2188 serde_json::from_str::<ProtocolStateRequestResponse>(server_resp).expect("deserialize");
2190
2191 let mocked_server = server
2192 .mock("POST", "/v1/protocol_state")
2193 .expect(1)
2194 .with_body(server_resp)
2195 .create_async()
2196 .await;
2197 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2198 .expect("create client");
2199
2200 let states = client
2201 .get_protocol_states(
2202 ProtocolStatesParams::new(Chain::Ethereum, "").with_include_balances(true),
2203 )
2204 .await
2205 .expect("get state");
2206
2207 mocked_server.assert();
2208 assert_eq!(states.data().len(), 1);
2209 assert_eq!(states.data()[0].component_id, "State1");
2210 let expected_attributes =
2211 [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
2212 .iter()
2213 .cloned()
2214 .collect::<HashMap<String, Bytes>>();
2215 assert_eq!(states.data()[0].attributes, expected_attributes);
2216 let expected_balances = [(
2217 Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")
2218 .expect("Unsupported address format"),
2219 Bytes::from_str("0x01f4").unwrap(),
2220 )]
2221 .iter()
2222 .cloned()
2223 .collect::<HashMap<Bytes, Bytes>>();
2224 assert_eq!(states.data()[0].balances, expected_balances);
2225 }
2226
2227 #[tokio::test]
2228 async fn test_get_tokens() {
2229 let mut server = Server::new_async().await;
2230 let server_resp = r#"
2231 {
2232 "tokens": [
2233 {
2234 "chain": "ethereum",
2235 "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
2236 "symbol": "WETH",
2237 "decimals": 18,
2238 "tax": 0,
2239 "gas": [
2240 29962
2241 ],
2242 "quality": 100
2243 },
2244 {
2245 "chain": "ethereum",
2246 "address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
2247 "symbol": "USDC",
2248 "decimals": 6,
2249 "tax": 0,
2250 "gas": [
2251 40652
2252 ],
2253 "quality": 100
2254 }
2255 ],
2256 "pagination": {
2257 "page": 0,
2258 "page_size": 20,
2259 "total": 10
2260 }
2261 }
2262 "#;
2263 serde_json::from_str::<TokensRequestResponse>(server_resp).expect("deserialize");
2265
2266 let mocked_server = server
2267 .mock("POST", "/v1/tokens")
2268 .expect(1)
2269 .with_body(server_resp)
2270 .create_async()
2271 .await;
2272 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2273 .expect("create client");
2274
2275 let tokens = client
2276 .get_tokens(TokensParams::new(Chain::Ethereum))
2277 .await
2278 .expect("get tokens");
2279
2280 let expected = vec![
2281 Token {
2282 chain: tycho_common::models::Chain::Ethereum,
2283 address: Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap(),
2284 symbol: "WETH".to_string(),
2285 decimals: 18,
2286 tax: 0,
2287 gas: vec![Some(29962)],
2288 quality: 100,
2289 },
2290 Token {
2291 chain: tycho_common::models::Chain::Ethereum,
2292 address: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
2293 symbol: "USDC".to_string(),
2294 decimals: 6,
2295 tax: 0,
2296 gas: vec![Some(40652)],
2297 quality: 100,
2298 },
2299 ];
2300
2301 mocked_server.assert();
2302 assert_eq!(*tokens.data(), expected);
2303 }
2304
2305 #[rstest]
2306 #[case::with_dci(Some(vec!["system2"]), vec!["system2"])]
2307 #[case::backward_compat(None, vec![])]
2308 #[tokio::test]
2309 async fn test_get_protocol_systems(
2310 #[case] dci_protocols: Option<Vec<&str>>,
2311 #[case] expected_dci: Vec<&str>,
2312 ) {
2313 use serde_json::json;
2314
2315 let mut json_value = json!({
2316 "protocol_systems": ["system1", "system2"],
2317 "pagination": { "page": 0, "page_size": 20, "total": 2 }
2318 });
2319 if let Some(dci) = dci_protocols {
2320 json_value["dci_protocols"] = json!(dci);
2321 }
2322 let server_resp = serde_json::to_string(&json_value).unwrap();
2323
2324 let mut server = Server::new_async().await;
2325 let mocked_server = server
2326 .mock("POST", "/v1/protocol_systems")
2327 .expect(1)
2328 .with_body(&server_resp)
2329 .create_async()
2330 .await;
2331 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2332 .expect("create client");
2333
2334 let response = client
2335 .get_protocol_systems(ProtocolSystemsParams::new(Chain::Ethereum))
2336 .await
2337 .expect("get protocol systems");
2338
2339 mocked_server.assert();
2340 assert_eq!(response.data().protocol_systems(), ["system1", "system2"]);
2341 assert_eq!(response.data().dci_protocols(), expected_dci.as_slice());
2342 }
2343
2344 #[tokio::test]
2345 async fn test_get_component_tvl() {
2346 let mut server = Server::new_async().await;
2347 let server_resp = r#"
2348 {
2349 "tvl": {
2350 "component1": 100.0
2351 },
2352 "pagination": {
2353 "page": 0,
2354 "page_size": 20,
2355 "total": 10
2356 }
2357 }
2358 "#;
2359 serde_json::from_str::<ComponentTvlRequestResponse>(server_resp).expect("deserialize");
2361
2362 let mocked_server = server
2363 .mock("POST", "/v1/component_tvl")
2364 .expect(1)
2365 .with_body(server_resp)
2366 .create_async()
2367 .await;
2368 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2369 .expect("create client");
2370
2371 let component_tvl = client
2372 .get_component_tvl(ComponentTvlParams::new(Chain::Ethereum))
2373 .await
2374 .expect("get component tvl");
2375
2376 mocked_server.assert();
2377 assert_eq!(component_tvl.data().get("component1"), Some(&100.0));
2378 }
2379
2380 #[tokio::test]
2381 async fn test_get_traced_entry_points() {
2382 let mut server = Server::new_async().await;
2383 let server_resp = r#"
2384 {
2385 "traced_entry_points": {
2386 "component_1": [
2387 [
2388 {
2389 "entry_point": {
2390 "external_id": "entrypoint_a",
2391 "target": "0x0000000000000000000000000000000000000001",
2392 "signature": "sig()"
2393 },
2394 "params": {
2395 "method": "rpctracer",
2396 "caller": "0x000000000000000000000000000000000000000a",
2397 "calldata": "0x000000000000000000000000000000000000000b"
2398 }
2399 },
2400 {
2401 "retriggers": [
2402 [
2403 "0x00000000000000000000000000000000000000aa",
2404 {"key": "0x0000000000000000000000000000000000000aaa", "offset": 12}
2405 ]
2406 ],
2407 "accessed_slots": {
2408 "0x0000000000000000000000000000000000aaaa": [
2409 "0x0000000000000000000000000000000000aaaa"
2410 ]
2411 }
2412 }
2413 ]
2414 ]
2415 },
2416 "pagination": {
2417 "page": 0,
2418 "page_size": 20,
2419 "total": 1
2420 }
2421 }
2422 "#;
2423 serde_json::from_str::<TracedEntryPointRequestResponse>(server_resp).expect("deserialize");
2425
2426 let mocked_server = server
2427 .mock("POST", "/v1/traced_entry_points")
2428 .expect(1)
2429 .with_body(server_resp)
2430 .create_async()
2431 .await;
2432 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2433 .expect("create client");
2434
2435 let entrypoints = client
2436 .get_traced_entry_points(TracedEntryPointsParams::new(Chain::Ethereum, ""))
2437 .await
2438 .expect("get traced entry points");
2439
2440 mocked_server.assert();
2441 assert_eq!(entrypoints.data().len(), 1);
2442 let comp1_entrypoints = entrypoints
2443 .data()
2444 .get("component_1")
2445 .expect("component_1 entrypoints should exist");
2446 assert_eq!(comp1_entrypoints.len(), 1);
2447
2448 let (entrypoint, trace_result) = &comp1_entrypoints[0];
2449 assert_eq!(entrypoint.entry_point.external_id, "entrypoint_a");
2450 assert_eq!(
2451 entrypoint.entry_point.target,
2452 Bytes::from_str("0x0000000000000000000000000000000000000001").unwrap()
2453 );
2454 assert_eq!(entrypoint.entry_point.signature, "sig()");
2455 let tycho_common::models::blockchain::TracingParams::RPCTracer(rpc_params) =
2456 &entrypoint.params;
2457 assert_eq!(
2458 rpc_params.caller,
2459 Some(Bytes::from("0x000000000000000000000000000000000000000a"))
2460 );
2461 assert_eq!(rpc_params.calldata, Bytes::from("0x000000000000000000000000000000000000000b"));
2462
2463 assert_eq!(
2464 trace_result.retriggers,
2465 HashSet::from([(
2466 Bytes::from("0x00000000000000000000000000000000000000aa"),
2467 AddressStorageLocation::new(
2468 Bytes::from("0x0000000000000000000000000000000000000aaa"),
2469 12
2470 )
2471 )])
2472 );
2473 assert_eq!(trace_result.accessed_slots.len(), 1);
2474 assert_eq!(
2475 trace_result.accessed_slots,
2476 HashMap::from([(
2477 Bytes::from("0x0000000000000000000000000000000000aaaa"),
2478 HashSet::from([Bytes::from("0x0000000000000000000000000000000000aaaa")])
2479 )])
2480 );
2481 }
2482
2483 #[tokio::test]
2484 async fn test_parse_retry_value_numeric() {
2485 let result = parse_retry_value("60");
2486 assert!(result.is_some());
2487
2488 let expected_time = SystemTime::now() + Duration::from_secs(60);
2489 let actual_time = result.unwrap();
2490
2491 let diff = if actual_time > expected_time {
2493 actual_time
2494 .duration_since(expected_time)
2495 .unwrap()
2496 } else {
2497 expected_time
2498 .duration_since(actual_time)
2499 .unwrap()
2500 };
2501 assert!(diff < Duration::from_secs(1), "Time difference too large: {:?}", diff);
2502 }
2503
2504 #[tokio::test]
2505 async fn test_parse_retry_value_rfc2822() {
2506 let rfc2822_date = "Sat, 01 Jan 2030 12:00:00 +0000";
2508 let result = parse_retry_value(rfc2822_date);
2509 assert!(result.is_some());
2510
2511 let parsed_time = result.unwrap();
2512 assert!(parsed_time > SystemTime::now());
2513 }
2514
2515 #[tokio::test]
2516 async fn test_parse_retry_value_invalid_formats() {
2517 assert!(parse_retry_value("invalid").is_none());
2519 assert!(parse_retry_value("").is_none());
2520 assert!(parse_retry_value("not_a_number").is_none());
2521 assert!(parse_retry_value("Mon, 32 Jan 2030 25:00:00 +0000").is_none());
2522 }
2524
2525 #[tokio::test]
2526 async fn test_parse_retry_value_zero_seconds() {
2527 let result = parse_retry_value("0");
2528 assert!(result.is_some());
2529
2530 let expected_time = SystemTime::now();
2531 let actual_time = result.unwrap();
2532
2533 let diff = if actual_time > expected_time {
2535 actual_time
2536 .duration_since(expected_time)
2537 .unwrap()
2538 } else {
2539 expected_time
2540 .duration_since(actual_time)
2541 .unwrap()
2542 };
2543 assert!(diff < Duration::from_secs(1));
2544 }
2545
2546 #[tokio::test]
2547 async fn test_error_for_response_rate_limited() {
2548 let mut server = Server::new_async().await;
2549 let mock = server
2550 .mock("GET", "/test")
2551 .with_status(429)
2552 .with_header("Retry-After", "60")
2553 .create_async()
2554 .await;
2555
2556 let client = reqwest::Client::new();
2557 let response = client
2558 .get(format!("{}/test", server.url()))
2559 .send()
2560 .await
2561 .unwrap();
2562
2563 let http_client =
2564 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2565 .unwrap()
2566 .with_test_backoff_policy();
2567 let result = http_client
2568 .error_for_response(response)
2569 .await;
2570
2571 mock.assert();
2572 assert!(matches!(result, Err(RPCError::RateLimited(_))));
2573 if let Err(RPCError::RateLimited(retry_after)) = result {
2574 assert!(retry_after.is_some());
2575 }
2576 }
2577
2578 #[tokio::test]
2579 async fn test_error_for_response_rate_limited_no_header() {
2580 let mut server = Server::new_async().await;
2581 let mock = server
2582 .mock("GET", "/test")
2583 .with_status(429)
2584 .create_async()
2585 .await;
2586
2587 let client = reqwest::Client::new();
2588 let response = client
2589 .get(format!("{}/test", server.url()))
2590 .send()
2591 .await
2592 .unwrap();
2593
2594 let http_client =
2595 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2596 .unwrap()
2597 .with_test_backoff_policy();
2598 let result = http_client
2599 .error_for_response(response)
2600 .await;
2601
2602 mock.assert();
2603 assert!(matches!(result, Err(RPCError::RateLimited(None))));
2604 }
2605
2606 #[tokio::test]
2607 async fn test_error_for_response_server_errors() {
2608 let test_cases =
2609 vec![(502, "Bad Gateway"), (503, "Service Unavailable"), (504, "Gateway Timeout")];
2610
2611 for (status_code, expected_body) in test_cases {
2612 let mut server = Server::new_async().await;
2613 let mock = server
2614 .mock("GET", "/test")
2615 .with_status(status_code)
2616 .with_body(expected_body)
2617 .create_async()
2618 .await;
2619
2620 let client = reqwest::Client::new();
2621 let response = client
2622 .get(format!("{}/test", server.url()))
2623 .send()
2624 .await
2625 .unwrap();
2626
2627 let http_client =
2628 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2629 .unwrap()
2630 .with_test_backoff_policy();
2631 let result = http_client
2632 .error_for_response(response)
2633 .await;
2634
2635 mock.assert();
2636 assert!(matches!(result, Err(RPCError::ServerUnreachable(_))));
2637 if let Err(RPCError::ServerUnreachable(body)) = result {
2638 assert_eq!(body, expected_body);
2639 }
2640 }
2641 }
2642
2643 #[tokio::test]
2644 async fn test_error_for_response_success() {
2645 let mut server = Server::new_async().await;
2646 let mock = server
2647 .mock("GET", "/test")
2648 .with_status(200)
2649 .with_body("success")
2650 .create_async()
2651 .await;
2652
2653 let client = reqwest::Client::new();
2654 let response = client
2655 .get(format!("{}/test", server.url()))
2656 .send()
2657 .await
2658 .unwrap();
2659
2660 let http_client =
2661 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2662 .unwrap()
2663 .with_test_backoff_policy();
2664 let result = http_client
2665 .error_for_response(response)
2666 .await;
2667
2668 mock.assert();
2669 assert!(result.is_ok());
2670
2671 let response = result.unwrap();
2672 assert_eq!(response.status(), 200);
2673 }
2674
2675 #[tokio::test]
2676 async fn test_handle_error_for_backoff_server_unreachable() {
2677 let http_client =
2678 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2679 .unwrap()
2680 .with_test_backoff_policy();
2681 let error = RPCError::ServerUnreachable("Service down".to_string());
2682
2683 let backoff_error = http_client
2684 .handle_error_for_backoff(error)
2685 .await;
2686
2687 match backoff_error {
2688 backoff::Error::Transient { err: RPCError::ServerUnreachable(msg), retry_after } => {
2689 assert_eq!(msg, "Service down");
2690 assert_eq!(retry_after, Some(Duration::from_millis(50))); }
2692 _ => panic!("Expected transient error for ServerUnreachable"),
2693 }
2694 }
2695
2696 #[tokio::test]
2697 async fn test_handle_error_for_backoff_rate_limited_with_retry_after() {
2698 let http_client =
2699 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2700 .unwrap()
2701 .with_test_backoff_policy();
2702 let future_time = SystemTime::now() + Duration::from_secs(30);
2703 let error = RPCError::RateLimited(Some(future_time));
2704
2705 let backoff_error = http_client
2706 .handle_error_for_backoff(error)
2707 .await;
2708
2709 match backoff_error {
2710 backoff::Error::Transient { err: RPCError::RateLimited(retry_after), .. } => {
2711 assert_eq!(retry_after, Some(future_time));
2712 }
2713 _ => panic!("Expected transient error for RateLimited"),
2714 }
2715
2716 let stored_retry_after = http_client.retry_after.read().await;
2718 assert_eq!(*stored_retry_after, Some(future_time));
2719 }
2720
2721 #[tokio::test]
2722 async fn test_handle_error_for_backoff_rate_limited_no_retry_after() {
2723 let http_client =
2724 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2725 .unwrap()
2726 .with_test_backoff_policy();
2727 let error = RPCError::RateLimited(None);
2728
2729 let backoff_error = http_client
2730 .handle_error_for_backoff(error)
2731 .await;
2732
2733 match backoff_error {
2734 backoff::Error::Transient { err: RPCError::RateLimited(None), .. } => {
2735 }
2737 _ => panic!("Expected transient error for RateLimited without retry-after"),
2738 }
2739 }
2740
2741 #[tokio::test]
2742 async fn test_handle_error_for_backoff_other_errors() {
2743 let http_client =
2744 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2745 .unwrap()
2746 .with_test_backoff_policy();
2747 let error = RPCError::ParseResponse("Invalid JSON".to_string());
2748
2749 let backoff_error = http_client
2750 .handle_error_for_backoff(error)
2751 .await;
2752
2753 match backoff_error {
2754 backoff::Error::Permanent(RPCError::ParseResponse(msg)) => {
2755 assert_eq!(msg, "Invalid JSON");
2756 }
2757 _ => panic!("Expected permanent error for ParseResponse"),
2758 }
2759 }
2760
2761 #[tokio::test]
2762 async fn test_wait_until_retry_after_no_retry_time() {
2763 let http_client =
2764 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2765 .unwrap()
2766 .with_test_backoff_policy();
2767
2768 let start = std::time::Instant::now();
2769 http_client
2770 .wait_until_retry_after()
2771 .await;
2772 let elapsed = start.elapsed();
2773
2774 assert!(elapsed < Duration::from_millis(100));
2776 }
2777
2778 #[tokio::test]
2779 async fn test_wait_until_retry_after_past_time() {
2780 let http_client =
2781 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2782 .unwrap()
2783 .with_test_backoff_policy();
2784
2785 let past_time = SystemTime::now() - Duration::from_secs(10);
2787 *http_client.retry_after.write().await = Some(past_time);
2788
2789 let start = std::time::Instant::now();
2790 http_client
2791 .wait_until_retry_after()
2792 .await;
2793 let elapsed = start.elapsed();
2794
2795 assert!(elapsed < Duration::from_millis(100));
2797 }
2798
2799 #[tokio::test]
2800 async fn test_wait_until_retry_after_future_time() {
2801 let http_client =
2802 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2803 .unwrap()
2804 .with_test_backoff_policy();
2805
2806 let future_time = SystemTime::now() + Duration::from_millis(100);
2808 *http_client.retry_after.write().await = Some(future_time);
2809
2810 let start = std::time::Instant::now();
2811 http_client
2812 .wait_until_retry_after()
2813 .await;
2814 let elapsed = start.elapsed();
2815
2816 assert!(elapsed >= Duration::from_millis(80)); assert!(elapsed <= Duration::from_millis(200)); }
2820
2821 #[tokio::test]
2822 async fn test_make_post_request_success() {
2823 let mut server = Server::new_async().await;
2824 let server_resp = r#"{"success": true}"#;
2825
2826 let mock = server
2827 .mock("POST", "/test")
2828 .with_status(200)
2829 .with_body(server_resp)
2830 .create_async()
2831 .await;
2832
2833 let http_client =
2834 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2835 .unwrap()
2836 .with_test_backoff_policy();
2837 let request_body = serde_json::json!({"test": "data"});
2838 let uri = format!("{}/test", server.url());
2839
2840 let result = http_client
2841 .make_post_request(&request_body, &uri)
2842 .await;
2843
2844 mock.assert();
2845 assert!(result.is_ok());
2846
2847 let response = result.unwrap();
2848 assert_eq!(response.status(), 200);
2849 assert_eq!(response.text().await.unwrap(), server_resp);
2850 }
2851
2852 #[tokio::test]
2853 async fn test_make_post_request_retry_on_server_error() {
2854 let mut server = Server::new_async().await;
2855 let error_mock = server
2857 .mock("POST", "/test")
2858 .with_status(503)
2859 .with_body("Service Unavailable")
2860 .expect(1)
2861 .create_async()
2862 .await;
2863
2864 let success_mock = server
2865 .mock("POST", "/test")
2866 .with_status(200)
2867 .with_body(r#"{"success": true}"#)
2868 .expect(1)
2869 .create_async()
2870 .await;
2871
2872 let http_client =
2873 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2874 .unwrap()
2875 .with_test_backoff_policy();
2876 let request_body = serde_json::json!({"test": "data"});
2877 let uri = format!("{}/test", server.url());
2878
2879 let result = http_client
2880 .make_post_request(&request_body, &uri)
2881 .await;
2882
2883 error_mock.assert();
2884 success_mock.assert();
2885 assert!(result.is_ok());
2886 }
2887
2888 #[tokio::test]
2889 async fn test_make_post_request_respect_retry_after_header() {
2890 let mut server = Server::new_async().await;
2891
2892 let rate_limit_mock = server
2894 .mock("POST", "/test")
2895 .with_status(429)
2896 .with_header("Retry-After", "1") .expect(1)
2898 .create_async()
2899 .await;
2900
2901 let success_mock = server
2902 .mock("POST", "/test")
2903 .with_status(200)
2904 .with_body(r#"{"success": true}"#)
2905 .expect(1)
2906 .create_async()
2907 .await;
2908
2909 let http_client =
2910 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2911 .unwrap()
2912 .with_test_backoff_policy();
2913 let request_body = serde_json::json!({"test": "data"});
2914 let uri = format!("{}/test", server.url());
2915
2916 let start = std::time::Instant::now();
2917 let result = http_client
2918 .make_post_request(&request_body, &uri)
2919 .await;
2920 let elapsed = start.elapsed();
2921
2922 rate_limit_mock.assert();
2923 success_mock.assert();
2924 assert!(result.is_ok());
2925
2926 assert!(elapsed >= Duration::from_millis(900)); assert!(elapsed <= Duration::from_millis(2000)); }
2930
2931 #[tokio::test]
2932 async fn test_make_post_request_permanent_error() {
2933 let mut server = Server::new_async().await;
2934
2935 let mock = server
2936 .mock("POST", "/test")
2937 .with_status(400) .with_body("Bad Request")
2939 .expect(1)
2940 .create_async()
2941 .await;
2942
2943 let http_client =
2944 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2945 .unwrap()
2946 .with_test_backoff_policy();
2947 let request_body = serde_json::json!({"test": "data"});
2948 let uri = format!("{}/test", server.url());
2949
2950 let result = http_client
2951 .make_post_request(&request_body, &uri)
2952 .await;
2953
2954 mock.assert();
2955 assert!(result.is_ok()); let response = result.unwrap();
2958 assert_eq!(response.status(), 400);
2959 }
2960
2961 #[tokio::test]
2962 async fn test_concurrent_requests_with_different_retry_after() {
2963 let mut server = Server::new_async().await;
2964
2965 let rate_limit_mock_1 = server
2967 .mock("POST", "/test1")
2968 .with_status(429)
2969 .with_header("Retry-After", "1")
2970 .expect(1)
2971 .create_async()
2972 .await;
2973
2974 let rate_limit_mock_2 = server
2976 .mock("POST", "/test2")
2977 .with_status(429)
2978 .with_header("Retry-After", "2")
2979 .expect(1)
2980 .create_async()
2981 .await;
2982
2983 let success_mock_1 = server
2985 .mock("POST", "/test1")
2986 .with_status(200)
2987 .with_body(r#"{"result": "success1"}"#)
2988 .expect(1)
2989 .create_async()
2990 .await;
2991
2992 let success_mock_2 = server
2993 .mock("POST", "/test2")
2994 .with_status(200)
2995 .with_body(r#"{"result": "success2"}"#)
2996 .expect(1)
2997 .create_async()
2998 .await;
2999
3000 let http_client =
3001 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3002 .unwrap()
3003 .with_test_backoff_policy();
3004 let request_body = serde_json::json!({"test": "data"});
3005
3006 let uri1 = format!("{}/test1", server.url());
3007 let uri2 = format!("{}/test2", server.url());
3008
3009 let start = std::time::Instant::now();
3011 let (result1, result2) = tokio::join!(
3012 http_client.make_post_request(&request_body, &uri1),
3013 http_client.make_post_request(&request_body, &uri2)
3014 );
3015 let elapsed = start.elapsed();
3016
3017 rate_limit_mock_1.assert();
3018 rate_limit_mock_2.assert();
3019 success_mock_1.assert();
3020 success_mock_2.assert();
3021
3022 assert!(result1.is_ok());
3023 assert!(result2.is_ok());
3024
3025 assert!(elapsed >= Duration::from_millis(1800)); assert!(elapsed <= Duration::from_millis(3000)); let final_retry_after = http_client.retry_after.read().await;
3033 assert!(final_retry_after.is_some());
3034
3035 if let Some(retry_time) = *final_retry_after {
3037 let now = SystemTime::now();
3040 let diff = if retry_time > now {
3041 retry_time.duration_since(now).unwrap()
3042 } else {
3043 now.duration_since(retry_time).unwrap()
3044 };
3045
3046 assert!(diff <= Duration::from_secs(3), "Retry time difference too large: {:?}", diff);
3048 }
3049 }
3050
3051 #[tokio::test]
3052 async fn test_get_snapshots() {
3053 let mut server = Server::new_async().await;
3054
3055 let protocol_states_resp = r#"
3057 {
3058 "states": [
3059 {
3060 "component_id": "component1",
3061 "attributes": {
3062 "attribute_1": "0x00000000000003e8"
3063 },
3064 "balances": {
3065 "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
3066 }
3067 }
3068 ],
3069 "pagination": {
3070 "page": 0,
3071 "page_size": 100,
3072 "total": 1
3073 }
3074 }
3075 "#;
3076
3077 let contract_state_resp = r#"
3079 {
3080 "accounts": [
3081 {
3082 "chain": "ethereum",
3083 "address": "0x1111111111111111111111111111111111111111",
3084 "title": "",
3085 "slots": {},
3086 "native_balance": "0x01f4",
3087 "token_balances": {},
3088 "code": "0x00",
3089 "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
3090 "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
3091 "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
3092 "creation_tx": null
3093 }
3094 ],
3095 "pagination": {
3096 "page": 0,
3097 "page_size": 100,
3098 "total": 1
3099 }
3100 }
3101 "#;
3102
3103 let tvl_resp = r#"
3105 {
3106 "tvl": {
3107 "component1": 1000000.0
3108 },
3109 "pagination": {
3110 "page": 0,
3111 "page_size": 100,
3112 "total": 1
3113 }
3114 }
3115 "#;
3116
3117 let protocol_states_mock = server
3118 .mock("POST", "/v1/protocol_state")
3119 .expect(1)
3120 .with_body(protocol_states_resp)
3121 .create_async()
3122 .await;
3123
3124 let contract_state_mock = server
3125 .mock("POST", "/v1/contract_state")
3126 .expect(1)
3127 .with_body(contract_state_resp)
3128 .create_async()
3129 .await;
3130
3131 let tvl_mock = server
3132 .mock("POST", "/v1/component_tvl")
3133 .expect(1)
3134 .with_body(tvl_resp)
3135 .create_async()
3136 .await;
3137
3138 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3139 .expect("create client");
3140
3141 let component = tycho_common::models::protocol::ProtocolComponent {
3142 id: "component1".to_string(),
3143 protocol_system: "test_protocol".to_string(),
3144 protocol_type_name: "test_type".to_string(),
3145 chain: Chain::Ethereum,
3146 tokens: vec![],
3147 contract_addresses: vec![
3148 Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()
3149 ],
3150 static_attributes: HashMap::new(),
3151 change: tycho_common::models::ChangeType::Creation,
3152 creation_tx: Bytes::from_str(
3153 "0x0000000000000000000000000000000000000000000000000000000000000000",
3154 )
3155 .unwrap(),
3156 created_at: chrono::Utc::now().naive_utc(),
3157 };
3158
3159 let mut components = HashMap::new();
3160 components.insert("component1".to_string(), component);
3161
3162 let contract_ids =
3163 vec![Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()];
3164
3165 let request = SnapshotParameters::new(
3166 Chain::Ethereum,
3167 "test_protocol",
3168 &components,
3169 &contract_ids,
3170 12345,
3171 );
3172
3173 let response = client
3174 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3175 .await
3176 .expect("get snapshots");
3177
3178 protocol_states_mock.assert();
3180 contract_state_mock.assert();
3181 tvl_mock.assert();
3182
3183 assert_eq!(response.states.len(), 1);
3185 assert!(response
3186 .states
3187 .contains_key("component1"));
3188
3189 let component_state = response
3191 .states
3192 .get("component1")
3193 .unwrap();
3194 assert_eq!(component_state.component_tvl, Some(1000000.0));
3195
3196 assert_eq!(response.vm_storage.len(), 1);
3198 let contract_addr = Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap();
3199 assert!(response
3200 .vm_storage
3201 .contains_key(&contract_addr));
3202 }
3203
3204 #[tokio::test]
3205 async fn test_get_snapshots_empty_components() {
3206 let server = Server::new_async().await;
3207 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3208 .expect("create client");
3209
3210 let components = HashMap::new();
3211 let contract_ids = vec![];
3212
3213 let request = SnapshotParameters::new(
3214 Chain::Ethereum,
3215 "test_protocol",
3216 &components,
3217 &contract_ids,
3218 12345,
3219 );
3220
3221 let response = client
3222 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3223 .await
3224 .expect("get snapshots");
3225
3226 assert!(response.states.is_empty());
3228 assert!(response.vm_storage.is_empty());
3229 }
3230
3231 #[tokio::test]
3232 async fn test_get_snapshots_without_tvl() {
3233 let mut server = Server::new_async().await;
3234
3235 let protocol_states_resp = r#"
3236 {
3237 "states": [
3238 {
3239 "component_id": "component1",
3240 "attributes": {},
3241 "balances": {}
3242 }
3243 ],
3244 "pagination": {
3245 "page": 0,
3246 "page_size": 100,
3247 "total": 1
3248 }
3249 }
3250 "#;
3251
3252 let protocol_states_mock = server
3253 .mock("POST", "/v1/protocol_state")
3254 .expect(1)
3255 .with_body(protocol_states_resp)
3256 .create_async()
3257 .await;
3258
3259 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3260 .expect("create client");
3261
3262 let component = tycho_common::models::protocol::ProtocolComponent {
3264 id: "component1".to_string(),
3265 protocol_system: "test_protocol".to_string(),
3266 protocol_type_name: "test_type".to_string(),
3267 chain: Chain::Ethereum,
3268 tokens: vec![],
3269 contract_addresses: vec![],
3270 static_attributes: HashMap::new(),
3271 change: tycho_common::models::ChangeType::Creation,
3272 creation_tx: Bytes::from_str(
3273 "0x0000000000000000000000000000000000000000000000000000000000000000",
3274 )
3275 .unwrap(),
3276 created_at: chrono::Utc::now().naive_utc(),
3277 };
3278
3279 let mut components = HashMap::new();
3280 components.insert("component1".to_string(), component);
3281 let contract_ids = vec![];
3282
3283 let request = SnapshotParameters::new(
3284 Chain::Ethereum,
3285 "test_protocol",
3286 &components,
3287 &contract_ids,
3288 12345,
3289 )
3290 .include_balances(false)
3291 .include_tvl(false);
3292
3293 let response = client
3294 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3295 .await
3296 .expect("get snapshots");
3297
3298 protocol_states_mock.assert();
3300 assert_eq!(response.states.len(), 1);
3304 let component_state = response
3306 .states
3307 .get("component1")
3308 .unwrap();
3309 assert_eq!(component_state.component_tvl, None);
3310 }
3311
3312 #[tokio::test]
3313 async fn test_compression_enabled() {
3314 let mut server = Server::new_async().await;
3315 let server_resp = GET_CONTRACT_STATE_RESP;
3316
3317 let compressed_body =
3319 zstd::encode_all(server_resp.as_bytes(), 0).expect("compression failed");
3320
3321 let mocked_server = server
3322 .mock("POST", "/v1/contract_state")
3323 .expect(1)
3324 .with_header("Content-Encoding", "zstd")
3325 .with_body(compressed_body)
3326 .create_async()
3327 .await;
3328
3329 let client = HttpRPCClient::new(
3331 server.url().as_str(),
3332 HttpRPCClientOptions::new().with_compression(true),
3333 )
3334 .expect("create client");
3335
3336 let response = client
3337 .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
3338 .await
3339 .expect("get state");
3340 let accounts = response;
3341
3342 mocked_server.assert();
3343 assert_eq!(accounts.data().len(), 1);
3344 assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
3345 }
3346
3347 #[tokio::test]
3348 async fn test_compression_disabled() {
3349 let mut server = Server::new_async().await;
3350 let server_resp = GET_CONTRACT_STATE_RESP;
3351
3352 let mocked_server = server
3355 .mock("POST", "/v1/contract_state")
3356 .expect(1)
3357 .match_header("Accept-Encoding", mockito::Matcher::Missing)
3358 .with_status(200)
3359 .with_body(server_resp)
3360 .create_async()
3361 .await;
3362
3363 let client = HttpRPCClient::new(
3365 server.url().as_str(),
3366 HttpRPCClientOptions::new().with_compression(false),
3367 )
3368 .expect("create client");
3369
3370 let response = client
3371 .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
3372 .await
3373 .expect("get state");
3374 let accounts = response;
3375
3376 mocked_server.assert();
3378 assert_eq!(accounts.data().len(), 1);
3379 assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
3380 }
3381
3382 #[rstest]
3383 #[case::single_page(2, 1000)]
3384 #[case::multiple_pages_within_concurrency(10, 2)]
3385 #[case::exceeds_concurrency_limit(60, 2)]
3386 #[tokio::test]
3387 async fn test_get_all_tokens_pagination_and_concurrency(
3388 #[case] total_tokens: usize,
3389 #[case] page_size: usize,
3390 ) {
3391 use std::sync::atomic::{AtomicUsize, Ordering};
3392
3393 let allowed_concurrency = 10;
3394
3395 let concurrent_requests = Arc::new(AtomicUsize::new(0));
3396 let max_concurrent = Arc::new(AtomicUsize::new(0));
3397
3398 let mut server = Server::new_async().await;
3399
3400 let total_pages = (total_tokens as f64 / page_size as f64).ceil() as i64;
3401
3402 for page in 0..total_pages {
3404 let concurrent = concurrent_requests.clone();
3405 let max_conc = max_concurrent.clone();
3406
3407 let tokens_in_page = {
3408 let start_idx = (page as usize) * page_size;
3409 let end_idx = ((page as usize + 1) * page_size).min(total_tokens);
3410 (start_idx..end_idx)
3411 .map(|i| {
3412 format!(
3413 r#"{{
3414 "chain": "ethereum",
3415 "address": "0x{i:040x}",
3416 "symbol": "TOKEN_{i}",
3417 "decimals": 18,
3418 "tax": 0,
3419 "gas": [30000],
3420 "quality": 100
3421 }}"#
3422 )
3423 })
3424 .collect::<Vec<_>>()
3425 };
3426
3427 let tokens_json = tokens_in_page.join(",");
3428 let response = format!(
3429 r#"{{
3430 "tokens": [{tokens_json}],
3431 "pagination": {{
3432 "page": {page},
3433 "page_size": {page_size},
3434 "total": {total_tokens}
3435 }}
3436 }}"#,
3437 );
3438
3439 server
3440 .mock("POST", "/v1/tokens")
3441 .expect(1)
3442 .with_chunked_body(move |w| {
3443 let current = concurrent.fetch_add(1, Ordering::SeqCst);
3445 max_conc.fetch_max(current + 1, Ordering::SeqCst);
3446
3447 std::thread::sleep(Duration::from_millis(10));
3449
3450 concurrent.fetch_sub(1, Ordering::SeqCst);
3451
3452 w.write_all(response.as_bytes())
3453 })
3454 .create_async()
3455 .await;
3456 }
3457
3458 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3459 .expect("create client");
3460
3461 let tokens = client
3462 .get_all_tokens(
3463 AllTokensParams::new(Chain::Ethereum, allowed_concurrency)
3464 .with_chunk_size(page_size),
3465 )
3466 .await
3467 .expect("get all tokens");
3468
3469 let max = max_concurrent.load(Ordering::SeqCst);
3471 let expected_max_concurrency = (total_pages as usize)
3472 .saturating_sub(1)
3473 .min(allowed_concurrency);
3474 assert!(
3475 max <= allowed_concurrency,
3476 "Expected max concurrent requests <= {allowed_concurrency}, got {max}"
3477 );
3478
3479 if total_pages > 1 && expected_max_concurrency > 1 {
3481 assert!(
3482 max > 0,
3483 "Expected some concurrent requests for multi-page response, got {max}"
3484 );
3485 }
3486
3487 assert_eq!(
3489 tokens.len(),
3490 total_tokens,
3491 "Expected {total_tokens} tokens, got {}",
3492 tokens.len()
3493 );
3494
3495 for (i, token) in tokens.iter().enumerate() {
3497 assert_eq!(token.symbol, format!("TOKEN_{i}"), "Token at index {i} has wrong symbol");
3498 }
3499 }
3500}