1use std::{
7 collections::HashMap,
8 sync::Arc,
9 time::{Duration, SystemTime},
10};
11
12use async_trait::async_trait;
13use backoff::{exponential::ExponentialBackoffBuilder, ExponentialBackoff};
14use futures03::future::try_join_all;
15#[cfg(test)]
16use mockall::automock;
17use reqwest::{header, Client, ClientBuilder, Response, StatusCode, Url};
18use serde::Serialize;
19use thiserror::Error;
20use time::{format_description::well_known::Rfc2822, OffsetDateTime};
21use tokio::{
22 sync::{RwLock, Semaphore},
23 time::sleep,
24};
25use tracing::{debug, error, instrument, trace, warn};
26use tycho_common::{
27 dto::{
28 ComponentTvlRequestBody, ComponentTvlRequestResponse, PaginationLimits, PaginationParams,
29 ProtocolComponentRequestResponse, ProtocolComponentsRequestBody, ProtocolStateRequestBody,
30 ProtocolStateRequestResponse, ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse,
31 StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
32 TracedEntryPointRequestBody, TracedEntryPointRequestResponse, VersionParam,
33 },
34 models::{
35 blockchain::{EntryPointWithTracingParams, TracedEntryPoints, TracingResult},
36 contract::Account,
37 protocol::{ProtocolComponent, ProtocolComponentState},
38 token::Token,
39 Chain, ComponentId,
40 },
41 Bytes,
42};
43
44#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct ProtocolSystems {
47 protocol_systems: Vec<String>,
48 dci_protocols: Vec<String>,
49}
50
51impl ProtocolSystems {
52 pub(crate) fn new(protocol_systems: Vec<String>, dci_protocols: Vec<String>) -> Self {
53 Self { protocol_systems, dci_protocols }
54 }
55
56 pub fn protocol_systems(&self) -> &[String] {
57 &self.protocol_systems
58 }
59
60 pub fn dci_protocols(&self) -> &[String] {
61 &self.dci_protocols
62 }
63}
64
65#[derive(Debug, Clone, PartialEq)]
67pub struct Page<T> {
68 data: T,
69 total: i64,
70 page: i64,
71 page_size: i64,
72}
73
74impl<T> Page<T> {
75 pub fn new(data: T, total: i64, page: i64, page_size: i64) -> Self {
76 Page { data, total, page, page_size }
77 }
78
79 pub fn data(&self) -> &T {
80 &self.data
81 }
82
83 pub fn into_data(self) -> T {
84 self.data
85 }
86
87 pub fn total(&self) -> i64 {
88 self.total
89 }
90
91 pub fn page(&self) -> i64 {
92 self.page
93 }
94
95 pub fn page_size(&self) -> i64 {
96 self.page_size
97 }
98}
99
100impl<T> Page<Vec<T>> {
101 pub fn len(&self) -> usize {
102 self.data.len()
103 }
104
105 pub fn is_empty(&self) -> bool {
106 self.data.is_empty()
107 }
108}
109
110impl<K, V, S: std::hash::BuildHasher> Page<HashMap<K, V, S>> {
111 pub fn len(&self) -> usize {
112 self.data.len()
113 }
114
115 pub fn is_empty(&self) -> bool {
116 self.data.is_empty()
117 }
118}
119
120impl<T: IntoIterator> IntoIterator for Page<T> {
121 type Item = T::Item;
122 type IntoIter = T::IntoIter;
123
124 fn into_iter(self) -> Self::IntoIter {
125 self.data.into_iter()
126 }
127}
128
129impl<'a, T> IntoIterator for &'a Page<T>
130where
131 &'a T: IntoIterator,
132{
133 type Item = <&'a T as IntoIterator>::Item;
134 type IntoIter = <&'a T as IntoIterator>::IntoIter;
135
136 fn into_iter(self) -> Self::IntoIter {
137 (&self.data).into_iter()
138 }
139}
140
141use crate::{
142 feed::synchronizer::{ComponentWithState, Snapshot},
143 TYCHO_SERVER_VERSION,
144};
145
146pub const RPC_CLIENT_CONCURRENCY: usize = 4;
148
149#[derive(Clone, PartialEq, Debug)]
151pub struct ContractStateParams {
152 chain: Chain,
153 protocol_system: String,
154 contract_ids: Option<Vec<Bytes>>,
155 version: VersionParam,
156 page: i64,
157 page_size: i64,
158}
159
160impl ContractStateParams {
161 pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
162 Self {
163 chain,
164 protocol_system: protocol_system.into(),
165 contract_ids: None,
166 version: VersionParam::default(),
167 page: 0,
168 page_size: StateRequestBody::MAX_PAGE_SIZE_COMPRESSED,
169 }
170 }
171
172 pub fn with_contract_ids(mut self, ids: Vec<Bytes>) -> Self {
173 self.contract_ids = Some(ids);
174 self
175 }
176
177 pub fn with_version(mut self, version: VersionParam) -> Self {
178 self.version = version;
179 self
180 }
181
182 pub fn with_block_number(mut self, block_number: u64) -> Self {
183 self.version = VersionParam::at_block(self.chain.into(), block_number);
184 self
185 }
186
187 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
188 self.page = page;
189 self.page_size = page_size;
190 self
191 }
192}
193
194#[derive(Clone, PartialEq, Debug)]
196pub struct ProtocolComponentsParams {
197 chain: Chain,
198 protocol_system: String,
199 component_ids: Option<Vec<ComponentId>>,
200 tvl_gt: Option<f64>,
201 page: i64,
202 page_size: i64,
203}
204
205impl ProtocolComponentsParams {
206 pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
207 Self {
208 chain,
209 protocol_system: protocol_system.into(),
210 component_ids: None,
211 tvl_gt: None,
212 page: 0,
213 page_size: ProtocolComponentsRequestBody::MAX_PAGE_SIZE_COMPRESSED,
214 }
215 }
216
217 pub fn with_component_ids(mut self, ids: Vec<ComponentId>) -> Self {
218 self.component_ids = Some(ids);
219 self
220 }
221
222 pub fn with_tvl_gt(mut self, tvl_gt: f64) -> Self {
223 self.tvl_gt = Some(tvl_gt);
224 self
225 }
226
227 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
228 self.page = page;
229 self.page_size = page_size;
230 self
231 }
232
233 #[cfg(test)]
234 pub(crate) fn component_ids(&self) -> Option<&Vec<ComponentId>> {
235 self.component_ids.as_ref()
236 }
237}
238
239#[derive(Clone, PartialEq, Debug)]
241pub struct ProtocolStatesParams {
242 chain: Chain,
243 protocol_system: String,
244 protocol_ids: Option<Vec<String>>,
245 include_balances: bool,
246 version: VersionParam,
247 page: i64,
248 page_size: i64,
249}
250
251impl ProtocolStatesParams {
252 pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
253 Self {
254 chain,
255 protocol_system: protocol_system.into(),
256 protocol_ids: None,
257 include_balances: false,
258 version: VersionParam::default(),
259 page: 0,
260 page_size: ProtocolStateRequestBody::MAX_PAGE_SIZE_COMPRESSED,
261 }
262 }
263
264 pub fn with_protocol_ids(mut self, ids: Vec<String>) -> Self {
265 self.protocol_ids = Some(ids);
266 self
267 }
268
269 pub fn with_include_balances(mut self, include_balances: bool) -> Self {
270 self.include_balances = include_balances;
271 self
272 }
273
274 pub fn with_version(mut self, version: VersionParam) -> Self {
275 self.version = version;
276 self
277 }
278
279 pub fn with_block_number(mut self, block_number: u64) -> Self {
280 self.version = VersionParam::at_block(self.chain.into(), block_number);
281 self
282 }
283
284 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
285 self.page = page;
286 self.page_size = page_size;
287 self
288 }
289}
290
291#[derive(Clone, PartialEq, Debug)]
293pub struct TokensParams {
294 chain: Chain,
295 min_quality: Option<i32>,
296 traded_n_days_ago: Option<u64>,
297 page: i64,
298 page_size: i64,
299}
300
301impl TokensParams {
302 pub fn new(chain: Chain) -> Self {
303 Self {
304 chain,
305 min_quality: None,
306 traded_n_days_ago: None,
307 page: 0,
308 page_size: TokensRequestBody::MAX_PAGE_SIZE_COMPRESSED,
309 }
310 }
311
312 pub fn with_min_quality(mut self, min_quality: i32) -> Self {
313 self.min_quality = Some(min_quality);
314 self
315 }
316
317 pub fn with_traded_n_days_ago(mut self, days: u64) -> Self {
318 self.traded_n_days_ago = Some(days);
319 self
320 }
321
322 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
323 self.page = page;
324 self.page_size = page_size;
325 self
326 }
327}
328
329#[derive(Clone, PartialEq, Debug)]
331pub struct ProtocolSystemsParams {
332 chain: Chain,
333 page: i64,
334 page_size: i64,
335}
336
337impl ProtocolSystemsParams {
338 pub fn new(chain: Chain) -> Self {
339 Self { chain, page: 0, page_size: ProtocolSystemsRequestBody::MAX_PAGE_SIZE_COMPRESSED }
340 }
341
342 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
343 self.page = page;
344 self.page_size = page_size;
345 self
346 }
347}
348
349#[derive(Clone, PartialEq, Debug)]
351pub struct ComponentTvlParams {
352 chain: Chain,
353 protocol_system: Option<String>,
354 component_ids: Option<Vec<String>>,
355 page: i64,
356 page_size: i64,
357}
358
359impl ComponentTvlParams {
360 pub fn new(chain: Chain) -> Self {
361 Self {
362 chain,
363 protocol_system: None,
364 component_ids: None,
365 page: 0,
366 page_size: ComponentTvlRequestBody::MAX_PAGE_SIZE_COMPRESSED,
367 }
368 }
369
370 pub fn with_protocol_system(mut self, protocol_system: impl Into<String>) -> Self {
371 self.protocol_system = Some(protocol_system.into());
372 self
373 }
374
375 pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
376 self.component_ids = Some(ids);
377 self
378 }
379
380 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
381 self.page = page;
382 self.page_size = page_size;
383 self
384 }
385}
386
387#[derive(Clone, PartialEq, Debug)]
389pub struct TracedEntryPointsParams {
390 chain: Chain,
391 protocol_system: String,
392 component_ids: Option<Vec<String>>,
393 page: i64,
394 page_size: i64,
395}
396
397impl TracedEntryPointsParams {
398 pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
399 Self {
400 chain,
401 protocol_system: protocol_system.into(),
402 component_ids: None,
403 page: 0,
404 page_size: TracedEntryPointRequestBody::MAX_PAGE_SIZE_COMPRESSED,
405 }
406 }
407
408 pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
409 self.component_ids = Some(ids);
410 self
411 }
412
413 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
414 self.page = page;
415 self.page_size = page_size;
416 self
417 }
418}
419
420#[derive(Clone, PartialEq, Debug)]
422pub struct ProtocolComponentsPaginatedParams {
423 chain: Chain,
424 protocol_system: String,
425 component_ids: Option<Vec<ComponentId>>,
426 tvl_gt: Option<f64>,
427 chunk_size: Option<usize>,
428 concurrency: usize,
429}
430
431impl ProtocolComponentsPaginatedParams {
432 pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
433 Self {
434 chain,
435 protocol_system: protocol_system.into(),
436 component_ids: None,
437 tvl_gt: None,
438 chunk_size: None,
439 concurrency,
440 }
441 }
442
443 pub fn with_component_ids(mut self, ids: Vec<ComponentId>) -> Self {
444 self.component_ids = Some(ids);
445 self
446 }
447
448 pub fn with_tvl_gt(mut self, tvl_gt: f64) -> Self {
449 self.tvl_gt = Some(tvl_gt);
450 self
451 }
452
453 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
454 self.chunk_size = Some(chunk_size);
455 self
456 }
457}
458
459#[derive(Clone, PartialEq, Debug)]
461pub struct TracedEntryPointsPaginatedParams {
462 chain: Chain,
463 protocol_system: String,
464 component_ids: Vec<String>,
465 chunk_size: Option<usize>,
466 concurrency: usize,
467}
468
469impl TracedEntryPointsPaginatedParams {
470 pub fn new(
471 chain: Chain,
472 protocol_system: impl Into<String>,
473 component_ids: Vec<String>,
474 concurrency: usize,
475 ) -> Self {
476 Self {
477 chain,
478 protocol_system: protocol_system.into(),
479 component_ids,
480 chunk_size: None,
481 concurrency,
482 }
483 }
484
485 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
486 self.chunk_size = Some(chunk_size);
487 self
488 }
489}
490
491#[derive(Clone, PartialEq, Debug)]
493pub struct ProtocolStatesPaginatedParams {
494 chain: Chain,
495 protocol_system: String,
496 protocol_ids: Vec<String>,
497 include_balances: bool,
498 version: VersionParam,
499 chunk_size: Option<usize>,
500 concurrency: usize,
501}
502
503impl ProtocolStatesPaginatedParams {
504 pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
505 Self {
506 chain,
507 protocol_system: protocol_system.into(),
508 protocol_ids: Vec::new(),
509 include_balances: true,
510 version: VersionParam::default(),
511 chunk_size: None,
512 concurrency,
513 }
514 }
515
516 pub fn with_protocol_ids(mut self, ids: Vec<String>) -> Self {
517 self.protocol_ids = ids;
518 self
519 }
520
521 pub fn with_include_balances(mut self, include_balances: bool) -> Self {
522 self.include_balances = include_balances;
523 self
524 }
525
526 pub fn with_version(mut self, version: VersionParam) -> Self {
527 self.version = version;
528 self
529 }
530
531 pub fn with_block_number(mut self, block_number: u64) -> Self {
532 self.version = VersionParam::at_block(self.chain.into(), block_number);
533 self
534 }
535
536 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
537 self.chunk_size = Some(chunk_size);
538 self
539 }
540}
541
542#[derive(Clone, PartialEq, Debug)]
544pub struct AllTokensParams {
545 chain: Chain,
546 min_quality: Option<i32>,
547 traded_n_days_ago: Option<u64>,
548 chunk_size: Option<usize>,
549 concurrency: usize,
550}
551
552impl AllTokensParams {
553 pub fn new(chain: Chain, concurrency: usize) -> Self {
554 Self { chain, min_quality: None, traded_n_days_ago: None, chunk_size: None, concurrency }
555 }
556
557 pub fn with_min_quality(mut self, min_quality: i32) -> Self {
558 self.min_quality = Some(min_quality);
559 self
560 }
561
562 pub fn with_traded_n_days_ago(mut self, days: u64) -> Self {
563 self.traded_n_days_ago = Some(days);
564 self
565 }
566
567 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
568 self.chunk_size = Some(chunk_size);
569 self
570 }
571}
572
573#[derive(Clone, PartialEq, Debug)]
575pub struct ComponentTvlPaginatedParams {
576 chain: Chain,
577 protocol_system: Option<String>,
578 component_ids: Option<Vec<String>>,
579 chunk_size: Option<usize>,
580 concurrency: usize,
581}
582
583impl ComponentTvlPaginatedParams {
584 pub fn new(chain: Chain, concurrency: usize) -> Self {
585 Self { chain, protocol_system: None, component_ids: None, chunk_size: None, concurrency }
586 }
587
588 pub fn with_protocol_system(mut self, protocol_system: impl Into<String>) -> Self {
589 self.protocol_system = Some(protocol_system.into());
590 self
591 }
592
593 pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
594 self.component_ids = Some(ids);
595 self
596 }
597
598 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
599 self.chunk_size = Some(chunk_size);
600 self
601 }
602}
603
604#[derive(Clone, PartialEq, Debug)]
606pub struct ContractStatePaginatedParams {
607 chain: Chain,
608 protocol_system: String,
609 contract_ids: Vec<Bytes>,
610 version: VersionParam,
611 chunk_size: Option<usize>,
612 concurrency: usize,
613}
614
615impl ContractStatePaginatedParams {
616 pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
617 Self {
618 chain,
619 protocol_system: protocol_system.into(),
620 contract_ids: Vec::new(),
621 version: VersionParam::default(),
622 chunk_size: None,
623 concurrency,
624 }
625 }
626
627 pub fn with_contract_ids(mut self, ids: Vec<Bytes>) -> Self {
628 self.contract_ids = ids;
629 self
630 }
631
632 pub fn with_version(mut self, version: VersionParam) -> Self {
633 self.version = version;
634 self
635 }
636
637 pub fn with_block_number(mut self, block_number: u64) -> Self {
638 self.version = VersionParam::at_block(self.chain.into(), block_number);
639 self
640 }
641
642 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
643 self.chunk_size = Some(chunk_size);
644 self
645 }
646}
647
648#[derive(Clone, Debug, PartialEq)]
653pub struct SnapshotParameters<'a> {
654 pub chain: Chain,
656 pub protocol_system: &'a str,
658 pub components: &'a HashMap<ComponentId, ProtocolComponent>,
660 pub entrypoints: Option<&'a TracedEntryPoints>,
662 pub contract_ids: &'a [Bytes],
664 pub block_number: u64,
666 pub include_balances: bool,
668 pub include_tvl: bool,
670}
671
672impl<'a> SnapshotParameters<'a> {
673 pub fn new(
674 chain: Chain,
675 protocol_system: &'a str,
676 components: &'a HashMap<ComponentId, ProtocolComponent>,
677 contract_ids: &'a [Bytes],
678 block_number: u64,
679 ) -> Self {
680 Self {
681 chain,
682 protocol_system,
683 components,
684 entrypoints: None,
685 contract_ids,
686 block_number,
687 include_balances: true,
688 include_tvl: true,
689 }
690 }
691
692 pub fn include_balances(mut self, include_balances: bool) -> Self {
694 self.include_balances = include_balances;
695 self
696 }
697
698 pub fn include_tvl(mut self, include_tvl: bool) -> Self {
700 self.include_tvl = include_tvl;
701 self
702 }
703
704 pub fn entrypoints(mut self, entrypoints: &'a TracedEntryPoints) -> Self {
705 self.entrypoints = Some(entrypoints);
706 self
707 }
708}
709
710#[derive(Error, Debug)]
711pub enum RPCError {
712 #[error("Failed to parse URL: {0}. Error: {1}")]
714 UrlParsing(String, String),
715
716 #[error("Failed to format request: {0}")]
718 FormatRequest(String),
719
720 #[error("Unexpected HTTP client error: {0}")]
722 HttpClient(String, #[source] reqwest::Error),
723
724 #[error("Failed to parse response: {0}")]
726 ParseResponse(String),
727
728 #[error("Snapshot block is stale: {0}")]
730 StaleBlock(String),
731
732 #[error("Unknown extractor: {0}")]
734 UnknownExtractor(String),
735
736 #[error("Fatal error: {0}")]
738 Fatal(String),
739
740 #[error("Rate limited until {0:?}")]
741 RateLimited(Option<SystemTime>),
742
743 #[error("Server unreachable: {0}")]
744 ServerUnreachable(String),
745}
746
747impl RPCError {
748 fn from_parse_error(err: serde_json::Error, body: &str) -> Self {
758 if body.contains("version is older than") || body.contains("Could not find Block") {
759 RPCError::StaleBlock(body.to_string())
760 } else if body.starts_with("Unknown extractor:") {
761 RPCError::UnknownExtractor(body.to_string())
762 } else {
763 RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
764 }
765 }
766}
767
768#[cfg_attr(test, automock)]
769#[async_trait]
770pub trait RPCClient: Send + Sync {
771 fn compression(&self) -> bool;
773
774 async fn get_contract_state(
778 &self,
779 params: ContractStateParams,
780 ) -> Result<Page<Vec<Account>>, RPCError>;
781
782 async fn get_contract_state_paginated(
786 &self,
787 params: ContractStatePaginatedParams,
788 ) -> Result<Vec<Account>, RPCError> {
789 let semaphore = Arc::new(Semaphore::new(params.concurrency));
790
791 let mut sorted_ids = params.contract_ids;
793 sorted_ids.sort();
794
795 let chunk_size = params
796 .chunk_size
797 .unwrap_or(StateRequestBody::effective_max_page_size(self.compression()) as usize);
798
799 let mut tasks = Vec::new();
800 for chunk in sorted_ids.chunks(chunk_size) {
801 let sem = semaphore.clone();
802 let base_params =
803 ContractStateParams::new(params.chain, params.protocol_system.as_str())
804 .with_contract_ids(chunk.to_vec())
805 .with_version(params.version.clone())
806 .with_pagination(0, chunk_size as i64);
807 tasks.push(async move {
808 let _permit = sem
809 .acquire()
810 .await
811 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
812 self.get_contract_state(base_params)
813 .await
814 });
815 }
816
817 let pages = try_join_all(tasks).await?;
818
819 let accounts = pages
820 .into_iter()
821 .flat_map(|p| p.into_iter())
822 .collect();
823
824 Ok(accounts)
825 }
826
827 async fn get_protocol_components(
831 &self,
832 params: ProtocolComponentsParams,
833 ) -> Result<Page<Vec<ProtocolComponent>>, RPCError>;
834
835 async fn get_protocol_components_paginated(
839 &self,
840 params: ProtocolComponentsPaginatedParams,
841 ) -> Result<Vec<ProtocolComponent>, RPCError> {
842 let chain = params.chain;
843 let protocol_system = params.protocol_system;
844 let component_ids = params.component_ids;
845 let tvl_gt = params.tvl_gt;
846 let chunk_size = params.chunk_size;
847 let concurrency = params.concurrency;
848
849 let semaphore = Arc::new(Semaphore::new(concurrency));
850
851 let chunk_size = chunk_size.unwrap_or(
852 ProtocolComponentsRequestBody::effective_max_page_size(self.compression()) as usize,
853 );
854
855 match component_ids {
858 Some(ids) => {
859 let tasks: Vec<_> =
860 ids.chunks(chunk_size)
861 .enumerate()
862 .map(|(index, chunk)| {
863 let sem = semaphore.clone();
864 let mut base =
865 ProtocolComponentsParams::new(chain, protocol_system.as_str())
866 .with_component_ids(chunk.to_vec())
867 .with_pagination(index as i64, chunk_size as i64);
868 if let Some(tvl) = tvl_gt {
869 base = base.with_tvl_gt(tvl);
870 }
871 async move {
872 let _permit = sem.acquire().await.map_err(|_| {
873 RPCError::Fatal("Semaphore dropped".to_string())
874 })?;
875 self.get_protocol_components(base).await
876 }
877 })
878 .collect();
879
880 try_join_all(tasks)
881 .await
882 .map(|pages| pages.into_iter().flatten().collect())
883 }
884 None => {
885 let mut base_params =
888 ProtocolComponentsParams::new(chain, protocol_system.as_str())
889 .with_pagination(0, chunk_size as i64);
890 if let Some(tvl) = tvl_gt {
891 base_params = base_params.with_tvl_gt(tvl);
892 }
893
894 let first_page = self
895 .get_protocol_components(base_params)
896 .await?;
897
898 let total_items = first_page.total();
899 let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
900
901 let mut all: Vec<ProtocolComponent> = first_page.into_data();
902
903 let mut page = 1;
904 while page < total_pages {
905 let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
906
907 let tasks: Vec<_> = (0..requests_in_this_iteration)
908 .map(|iter| {
909 let sem = semaphore.clone();
910 let mut p =
911 ProtocolComponentsParams::new(chain, protocol_system.as_str())
912 .with_pagination(page + iter, chunk_size as i64);
913 if let Some(tvl) = tvl_gt {
914 p = p.with_tvl_gt(tvl);
915 }
916 async move {
917 let _permit = sem.acquire().await.map_err(|_| {
918 RPCError::Fatal("Semaphore dropped".to_string())
919 })?;
920 self.get_protocol_components(p).await
921 }
922 })
923 .collect();
924
925 let responses = try_join_all(tasks).await?;
926
927 for resp in responses {
928 all.extend(resp);
929 }
930
931 page += requests_in_this_iteration;
932 }
933 Ok(all)
934 }
935 }
936 }
937
938 async fn get_protocol_states(
942 &self,
943 params: ProtocolStatesParams,
944 ) -> Result<Page<Vec<ProtocolComponentState>>, RPCError>;
945
946 async fn get_protocol_states_paginated(
950 &self,
951 params: ProtocolStatesPaginatedParams,
952 ) -> Result<Vec<ProtocolComponentState>, RPCError> {
953 let semaphore = Arc::new(Semaphore::new(params.concurrency));
954
955 let chunk_size =
956 params
957 .chunk_size
958 .unwrap_or(
959 ProtocolStateRequestBody::effective_max_page_size(self.compression()) as usize
960 );
961
962 let tasks: Vec<_> = params
963 .protocol_ids
964 .chunks(chunk_size)
965 .map(|c| {
966 let sem = semaphore.clone();
967 let p = ProtocolStatesParams::new(params.chain, params.protocol_system.as_str())
968 .with_protocol_ids(c.to_vec())
969 .with_include_balances(params.include_balances)
970 .with_version(params.version.clone())
971 .with_pagination(0, chunk_size as i64);
972 async move {
973 let _permit = sem
974 .acquire()
975 .await
976 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
977 self.get_protocol_states(p).await
978 }
979 })
980 .collect();
981
982 try_join_all(tasks)
983 .await
984 .map(|pages| pages.into_iter().flatten().collect())
985 }
986
987 async fn get_tokens(&self, params: TokensParams) -> Result<Page<Vec<Token>>, RPCError>;
991
992 async fn get_all_tokens(&self, params: AllTokensParams) -> Result<Vec<Token>, RPCError> {
996 let chunk_size = params
997 .chunk_size
998 .unwrap_or(TokensRequestBody::effective_max_page_size(self.compression()) as usize);
999
1000 let semaphore = Arc::new(Semaphore::new(params.concurrency));
1001
1002 let page_size: i64 = chunk_size.try_into().map_err(|_| {
1003 RPCError::FormatRequest("Failed to convert chunk_size into i64".to_string())
1004 })?;
1005
1006 let mut base_params = TokensParams::new(params.chain).with_pagination(0, page_size);
1007 if let Some(q) = params.min_quality {
1008 base_params = base_params.with_min_quality(q);
1009 }
1010 if let Some(d) = params.traded_n_days_ago {
1011 base_params = base_params.with_traded_n_days_ago(d);
1012 }
1013
1014 let first_page = self.get_tokens(base_params).await?;
1015 let total_pages = (first_page.total() as f64 / chunk_size as f64).ceil() as i64;
1016
1017 let mut all_tokens: Vec<Token> = first_page.into_data();
1018
1019 if total_pages <= 1 {
1020 return Ok(all_tokens);
1021 }
1022
1023 let tasks: Vec<_> = (1..total_pages)
1024 .map(|page| {
1025 let sem = semaphore.clone();
1026 let mut p = TokensParams::new(params.chain).with_pagination(page, page_size);
1027 if let Some(q) = params.min_quality {
1028 p = p.with_min_quality(q);
1029 }
1030 if let Some(d) = params.traded_n_days_ago {
1031 p = p.with_traded_n_days_ago(d);
1032 }
1033 async move {
1034 let _permit = sem
1035 .acquire()
1036 .await
1037 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
1038 self.get_tokens(p).await
1039 }
1040 })
1041 .collect();
1042
1043 let pages = try_join_all(tasks).await?;
1044 for page in pages {
1045 all_tokens.extend(page);
1046 }
1047
1048 Ok(all_tokens)
1049 }
1050
1051 async fn get_protocol_systems(
1053 &self,
1054 params: ProtocolSystemsParams,
1055 ) -> Result<Page<ProtocolSystems>, RPCError>;
1056
1057 async fn get_component_tvl(
1061 &self,
1062 params: ComponentTvlParams,
1063 ) -> Result<Page<HashMap<String, f64>>, RPCError>;
1064
1065 async fn get_component_tvl_paginated(
1069 &self,
1070 params: ComponentTvlPaginatedParams,
1071 ) -> Result<HashMap<String, f64>, RPCError> {
1072 let semaphore = Arc::new(Semaphore::new(params.concurrency));
1073
1074 let chunk_size =
1075 params
1076 .chunk_size
1077 .unwrap_or(
1078 ComponentTvlRequestBody::effective_max_page_size(self.compression()) as usize
1079 );
1080
1081 match params.component_ids {
1082 Some(ids) => {
1083 let tasks: Vec<_> =
1084 ids.chunks(chunk_size)
1085 .enumerate()
1086 .map(|(index, chunk)| {
1087 let sem = semaphore.clone();
1088 let mut p = ComponentTvlParams::new(params.chain)
1089 .with_component_ids(chunk.to_vec())
1090 .with_pagination(index as i64, chunk_size as i64);
1091 if let Some(ref ps) = params.protocol_system {
1092 p = p.with_protocol_system(ps.as_str());
1093 }
1094 async move {
1095 let _permit = sem.acquire().await.map_err(|_| {
1096 RPCError::Fatal("Semaphore dropped".to_string())
1097 })?;
1098 self.get_component_tvl(p).await
1099 }
1100 })
1101 .collect();
1102
1103 let pages = try_join_all(tasks).await?;
1104
1105 let mut merged_tvl = HashMap::new();
1106 for page in pages {
1107 for (key, value) in page {
1108 *merged_tvl.entry(key).or_insert(0.0) = value;
1109 }
1110 }
1111
1112 Ok(merged_tvl)
1113 }
1114 None => {
1115 let mut base =
1116 ComponentTvlParams::new(params.chain).with_pagination(0, chunk_size as i64);
1117 if let Some(ref ps) = params.protocol_system {
1118 base = base.with_protocol_system(ps.as_str());
1119 }
1120
1121 let first_page = self.get_component_tvl(base).await?;
1122 let total_items = first_page.total();
1123 let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
1124
1125 let mut merged_tvl: HashMap<String, f64> = first_page.into_data();
1126
1127 let mut page = 1;
1128 while page < total_pages {
1129 let requests_in_this_iteration =
1130 (total_pages - page).min(params.concurrency as i64);
1131
1132 let tasks: Vec<_> = (0..requests_in_this_iteration)
1133 .map(|i| {
1134 let sem = semaphore.clone();
1135 let mut p = ComponentTvlParams::new(params.chain)
1136 .with_pagination(page + i, chunk_size as i64);
1137 if let Some(ref ps) = params.protocol_system {
1138 p = p.with_protocol_system(ps.as_str());
1139 }
1140 async move {
1141 let _permit = sem.acquire().await.map_err(|_| {
1142 RPCError::Fatal("Semaphore dropped".to_string())
1143 })?;
1144 self.get_component_tvl(p).await
1145 }
1146 })
1147 .collect();
1148
1149 let responses = try_join_all(tasks).await?;
1150
1151 for resp in responses {
1152 for (key, value) in resp {
1153 *merged_tvl.entry(key).or_insert(0.0) = value;
1154 }
1155 }
1156
1157 page += requests_in_this_iteration;
1158 }
1159
1160 Ok(merged_tvl)
1161 }
1162 }
1163 }
1164
1165 async fn get_traced_entry_points(
1169 &self,
1170 params: TracedEntryPointsParams,
1171 ) -> Result<Page<TracedEntryPoints>, RPCError>;
1172
1173 async fn get_traced_entry_points_paginated(
1177 &self,
1178 params: TracedEntryPointsPaginatedParams,
1179 ) -> Result<TracedEntryPoints, RPCError> {
1180 let chain = params.chain;
1181 let protocol_system = params.protocol_system;
1182 let component_ids = params.component_ids;
1183 let chunk_size = params.chunk_size;
1184 let concurrency = params.concurrency;
1185
1186 let semaphore = Arc::new(Semaphore::new(concurrency));
1187
1188 let chunk_size = chunk_size.unwrap_or(
1189 TracedEntryPointRequestBody::effective_max_page_size(self.compression()) as usize,
1190 );
1191
1192 let tasks: Vec<_> = component_ids
1193 .chunks(chunk_size)
1194 .map(|c| {
1195 let sem = semaphore.clone();
1196 let params = TracedEntryPointsParams::new(chain, protocol_system.as_str())
1197 .with_component_ids(c.to_vec())
1198 .with_pagination(0, chunk_size as i64);
1199 async move {
1200 let _permit = sem
1201 .acquire()
1202 .await
1203 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
1204 self.get_traced_entry_points(params)
1205 .await
1206 }
1207 })
1208 .collect();
1209
1210 try_join_all(tasks)
1211 .await
1212 .map(|pages| pages.into_iter().flatten().collect())
1213 }
1214
1215 #[allow(clippy::extra_unused_lifetimes)]
1216 async fn get_snapshots<'a>(
1217 &self,
1218 request: &SnapshotParameters<'a>,
1219 chunk_size: Option<usize>,
1220 concurrency: usize,
1221 ) -> Result<Snapshot, RPCError>;
1222}
1223
1224#[derive(Debug, Clone)]
1226pub struct HttpRPCClientOptions {
1227 pub auth_key: Option<String>,
1229 pub compression: bool,
1232}
1233
1234impl Default for HttpRPCClientOptions {
1235 fn default() -> Self {
1236 Self::new()
1237 }
1238}
1239
1240impl HttpRPCClientOptions {
1241 pub fn new() -> Self {
1243 Self { auth_key: None, compression: true }
1244 }
1245
1246 pub fn with_auth_key(mut self, auth_key: Option<String>) -> Self {
1248 self.auth_key = auth_key;
1249 self
1250 }
1251
1252 pub fn with_compression(mut self, compression: bool) -> Self {
1254 self.compression = compression;
1255 self
1256 }
1257}
1258
1259#[derive(Debug, Clone)]
1260pub struct HttpRPCClient {
1261 http_client: Client,
1262 url: Url,
1263 retry_after: Arc<RwLock<Option<SystemTime>>>,
1264 backoff_policy: ExponentialBackoff,
1265 server_restart_duration: Duration,
1266 compression: bool,
1267}
1268
1269impl HttpRPCClient {
1270 pub fn new(base_uri: &str, options: HttpRPCClientOptions) -> Result<Self, RPCError> {
1271 let uri = base_uri
1272 .parse::<Url>()
1273 .map_err(|e| RPCError::UrlParsing(base_uri.to_string(), e.to_string()))?;
1274
1275 let mut headers = header::HeaderMap::new();
1277 headers.insert(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"));
1278 let user_agent = format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION"));
1279 headers.insert(
1280 header::USER_AGENT,
1281 header::HeaderValue::from_str(&user_agent)
1282 .map_err(|e| RPCError::FormatRequest(format!("Invalid user agent format: {e}")))?,
1283 );
1284
1285 if let Some(key) = options.auth_key.as_deref() {
1287 let mut auth_value = header::HeaderValue::from_str(key).map_err(|e| {
1288 RPCError::FormatRequest(format!("Invalid authorization key format: {e}"))
1289 })?;
1290 auth_value.set_sensitive(true);
1291 headers.insert(header::AUTHORIZATION, auth_value);
1292 }
1293
1294 let mut client_builder = ClientBuilder::new()
1295 .default_headers(headers)
1296 .http2_prior_knowledge();
1297
1298 if !options.compression {
1300 client_builder = client_builder.no_zstd();
1301 }
1302
1303 let client = client_builder
1304 .build()
1305 .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
1306
1307 Ok(Self {
1308 http_client: client,
1309 url: uri,
1310 retry_after: Arc::new(RwLock::new(None)),
1311 backoff_policy: ExponentialBackoffBuilder::new()
1312 .with_initial_interval(Duration::from_millis(250))
1313 .with_multiplier(1.75)
1315 .with_max_interval(Duration::from_secs(30))
1317 .with_max_elapsed_time(Some(Duration::from_secs(125)))
1319 .build(),
1320 server_restart_duration: Duration::from_secs(120),
1321 compression: options.compression,
1322 })
1323 }
1324
1325 #[cfg(test)]
1326 pub fn with_test_backoff_policy(mut self) -> Self {
1327 self.backoff_policy = ExponentialBackoffBuilder::new()
1329 .with_initial_interval(Duration::from_millis(1))
1330 .with_multiplier(1.1)
1331 .with_max_interval(Duration::from_millis(5))
1332 .with_max_elapsed_time(Some(Duration::from_millis(50)))
1333 .build();
1334 self.server_restart_duration = Duration::from_millis(50);
1335 self
1336 }
1337
1338 async fn error_for_response(
1344 &self,
1345 response: reqwest::Response,
1346 ) -> Result<reqwest::Response, RPCError> {
1347 match response.status() {
1348 StatusCode::TOO_MANY_REQUESTS => {
1349 let retry_after_raw = response
1350 .headers()
1351 .get(reqwest::header::RETRY_AFTER)
1352 .and_then(|h| h.to_str().ok())
1353 .and_then(parse_retry_value);
1354
1355 let reason = response
1356 .text()
1357 .await
1358 .unwrap_or_default();
1359 warn!(reason, retry_after = ?retry_after_raw, "Rate limited by server");
1360
1361 Err(RPCError::RateLimited(retry_after_raw))
1362 }
1363 StatusCode::BAD_GATEWAY |
1364 StatusCode::SERVICE_UNAVAILABLE |
1365 StatusCode::GATEWAY_TIMEOUT => Err(RPCError::ServerUnreachable(
1366 response
1367 .text()
1368 .await
1369 .unwrap_or_else(|_| "Server Unreachable".to_string()),
1370 )),
1371 _ => Ok(response),
1372 }
1373 }
1374
1375 async fn handle_error_for_backoff(&self, e: RPCError) -> backoff::Error<RPCError> {
1381 match e {
1382 RPCError::ServerUnreachable(_) => {
1383 backoff::Error::retry_after(e, self.server_restart_duration)
1384 }
1385 RPCError::RateLimited(Some(until)) => {
1386 let mut retry_after_guard = self.retry_after.write().await;
1387 *retry_after_guard = Some(
1388 retry_after_guard
1389 .unwrap_or(until)
1390 .max(until),
1391 );
1392
1393 if let Ok(duration) = until.duration_since(SystemTime::now()) {
1394 backoff::Error::retry_after(e, duration)
1395 } else {
1396 e.into()
1397 }
1398 }
1399 RPCError::RateLimited(None) => e.into(),
1400 _ => backoff::Error::permanent(e),
1401 }
1402 }
1403
1404 async fn wait_until_retry_after(&self) {
1409 if let Some(&until) = self.retry_after.read().await.as_ref() {
1410 let now = SystemTime::now();
1411 if until > now {
1412 if let Ok(duration) = until.duration_since(now) {
1413 sleep(duration).await
1414 }
1415 }
1416 }
1417 }
1418
1419 async fn make_post_request<T: Serialize + ?Sized>(
1424 &self,
1425 request: &T,
1426 uri: &String,
1427 ) -> Result<Response, RPCError> {
1428 self.wait_until_retry_after().await;
1429 let response = backoff::future::retry(self.backoff_policy.clone(), || async {
1430 let server_response = self
1431 .http_client
1432 .post(uri)
1433 .json(request)
1434 .send()
1435 .await
1436 .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
1437
1438 match self
1439 .error_for_response(server_response)
1440 .await
1441 {
1442 Ok(response) => Ok(response),
1443 Err(e) => Err(self.handle_error_for_backoff(e).await),
1444 }
1445 })
1446 .await?;
1447 Ok(response)
1448 }
1449}
1450
1451fn parse_retry_value(val: &str) -> Option<SystemTime> {
1452 if let Ok(secs) = val.parse::<u64>() {
1453 return Some(SystemTime::now() + Duration::from_secs(secs));
1454 }
1455 if let Ok(date) = OffsetDateTime::parse(val, &Rfc2822) {
1456 return Some(date.into());
1457 }
1458 None
1459}
1460
1461#[async_trait]
1462impl RPCClient for HttpRPCClient {
1463 fn compression(&self) -> bool {
1464 self.compression
1465 }
1466
1467 #[instrument(skip(self))]
1468 async fn get_contract_state(
1469 &self,
1470 params: ContractStateParams,
1471 ) -> Result<Page<Vec<Account>>, RPCError> {
1472 if params
1473 .contract_ids
1474 .as_ref()
1475 .is_none_or(|ids| ids.is_empty())
1476 {
1477 warn!("No contract ids specified in request.");
1478 }
1479
1480 let request = StateRequestBody {
1481 contract_ids: params.contract_ids,
1482 protocol_system: params.protocol_system,
1483 chain: params.chain.into(),
1484 version: params.version,
1485 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1486 };
1487
1488 let uri = format!(
1489 "{}/{}/contract_state",
1490 self.url
1491 .to_string()
1492 .trim_end_matches('/'),
1493 TYCHO_SERVER_VERSION
1494 );
1495 debug!(%uri, "Sending contract_state request to Tycho server");
1496 trace!(?request, "Sending request to Tycho server");
1497 let response = self
1498 .make_post_request(&request, &uri)
1499 .await?;
1500 trace!(?response, "Received response from Tycho server");
1501
1502 let body = response
1503 .text()
1504 .await
1505 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1506 if body.is_empty() {
1507 return Ok(Page::new(vec![], 0, 0, 0));
1509 }
1510
1511 let dto_response = serde_json::from_str::<StateRequestResponse>(&body)
1512 .map_err(|err| RPCError::from_parse_error(err, &body))?;
1513 trace!(?dto_response, "Received contract_state response from Tycho server");
1514
1515 let data: Vec<Account> = dto_response
1516 .accounts
1517 .into_iter()
1518 .map(Account::from)
1519 .collect();
1520 Ok(Page::new(
1521 data,
1522 dto_response.pagination.total,
1523 dto_response.pagination.page,
1524 dto_response.pagination.page_size,
1525 ))
1526 }
1527
1528 async fn get_protocol_components(
1529 &self,
1530 params: ProtocolComponentsParams,
1531 ) -> Result<Page<Vec<ProtocolComponent>>, RPCError> {
1532 let request = ProtocolComponentsRequestBody {
1533 protocol_system: params.protocol_system,
1534 component_ids: params.component_ids,
1535 tvl_gt: params.tvl_gt,
1536 chain: params.chain.into(),
1537 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1538 };
1539
1540 let uri = format!(
1541 "{}/{}/protocol_components",
1542 self.url
1543 .to_string()
1544 .trim_end_matches('/'),
1545 TYCHO_SERVER_VERSION,
1546 );
1547 debug!(%uri, "Sending protocol_components request to Tycho server");
1548 trace!(?request, "Sending request to Tycho server");
1549
1550 let response = self
1551 .make_post_request(&request, &uri)
1552 .await?;
1553
1554 trace!(?response, "Received response from Tycho server");
1555
1556 let body = response
1557 .text()
1558 .await
1559 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1560 let dto_response = serde_json::from_str::<ProtocolComponentRequestResponse>(&body)
1561 .map_err(|err| RPCError::from_parse_error(err, &body))?;
1562 trace!(?dto_response, "Received protocol_components response from Tycho server");
1563
1564 let data: Vec<ProtocolComponent> = dto_response
1565 .protocol_components
1566 .into_iter()
1567 .map(ProtocolComponent::from)
1568 .collect();
1569 Ok(Page::new(
1570 data,
1571 dto_response.pagination.total,
1572 dto_response.pagination.page,
1573 dto_response.pagination.page_size,
1574 ))
1575 }
1576
1577 async fn get_protocol_states(
1578 &self,
1579 params: ProtocolStatesParams,
1580 ) -> Result<Page<Vec<ProtocolComponentState>>, RPCError> {
1581 if params
1582 .protocol_ids
1583 .as_ref()
1584 .is_none_or(|ids| ids.is_empty())
1585 {
1586 warn!("No protocol ids specified in request.");
1587 }
1588
1589 let request = ProtocolStateRequestBody {
1590 protocol_ids: params.protocol_ids,
1591 protocol_system: params.protocol_system,
1592 chain: params.chain.into(),
1593 include_balances: params.include_balances,
1594 version: params.version,
1595 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1596 };
1597
1598 let uri = format!(
1599 "{}/{}/protocol_state",
1600 self.url
1601 .to_string()
1602 .trim_end_matches('/'),
1603 TYCHO_SERVER_VERSION
1604 );
1605 debug!(%uri, "Sending protocol_states request to Tycho server");
1606 trace!(?request, "Sending request to Tycho server");
1607
1608 let response = self
1609 .make_post_request(&request, &uri)
1610 .await?;
1611 trace!(?response, "Received response from Tycho server");
1612
1613 let body = response
1614 .text()
1615 .await
1616 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1617
1618 if body.is_empty() {
1619 return Ok(Page::new(vec![], 0, 0, 0));
1621 }
1622
1623 let dto_response = serde_json::from_str::<ProtocolStateRequestResponse>(&body)
1624 .map_err(|err| RPCError::from_parse_error(err, &body))?;
1625 trace!(?dto_response, "Received protocol_states response from Tycho server");
1626
1627 let data: Vec<ProtocolComponentState> = dto_response
1628 .states
1629 .into_iter()
1630 .map(ProtocolComponentState::from)
1631 .collect();
1632 Ok(Page::new(
1633 data,
1634 dto_response.pagination.total,
1635 dto_response.pagination.page,
1636 dto_response.pagination.page_size,
1637 ))
1638 }
1639
1640 async fn get_tokens(&self, params: TokensParams) -> Result<Page<Vec<Token>>, RPCError> {
1641 let request = TokensRequestBody {
1642 token_addresses: None,
1643 min_quality: params.min_quality,
1644 traded_n_days_ago: params.traded_n_days_ago,
1645 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1646 chain: params.chain.into(),
1647 };
1648
1649 let uri = format!(
1650 "{}/{}/tokens",
1651 self.url
1652 .to_string()
1653 .trim_end_matches('/'),
1654 TYCHO_SERVER_VERSION
1655 );
1656 debug!(%uri, "Sending tokens request to Tycho server");
1657
1658 let response = self
1659 .make_post_request(&request, &uri)
1660 .await?;
1661
1662 let body = response
1663 .text()
1664 .await
1665 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1666 let dto_response = serde_json::from_str::<TokensRequestResponse>(&body)
1667 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1668
1669 let data: Vec<Token> = dto_response
1670 .tokens
1671 .into_iter()
1672 .map(Token::from)
1673 .collect();
1674 Ok(Page::new(
1675 data,
1676 dto_response.pagination.total,
1677 dto_response.pagination.page,
1678 dto_response.pagination.page_size,
1679 ))
1680 }
1681
1682 async fn get_protocol_systems(
1683 &self,
1684 params: ProtocolSystemsParams,
1685 ) -> Result<Page<ProtocolSystems>, RPCError> {
1686 let request = ProtocolSystemsRequestBody {
1687 chain: params.chain.into(),
1688 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1689 };
1690
1691 let uri = format!(
1692 "{}/{}/protocol_systems",
1693 self.url
1694 .to_string()
1695 .trim_end_matches('/'),
1696 TYCHO_SERVER_VERSION
1697 );
1698 debug!(%uri, "Sending protocol_systems request to Tycho server");
1699 trace!(?request, "Sending request to Tycho server");
1700 let response = self
1701 .make_post_request(&request, &uri)
1702 .await?;
1703 trace!(?response, "Received response from Tycho server");
1704 let body = response
1705 .text()
1706 .await
1707 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1708 let dto = serde_json::from_str::<ProtocolSystemsRequestResponse>(&body)
1709 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1710 trace!(?dto, "Received protocol_systems response from Tycho server");
1711 Ok(Page::new(
1712 ProtocolSystems::new(dto.protocol_systems, dto.dci_protocols),
1713 dto.pagination.total,
1714 dto.pagination.page,
1715 dto.pagination.page_size,
1716 ))
1717 }
1718
1719 async fn get_component_tvl(
1720 &self,
1721 params: ComponentTvlParams,
1722 ) -> Result<Page<HashMap<String, f64>>, RPCError> {
1723 let request = ComponentTvlRequestBody {
1724 chain: params.chain.into(),
1725 protocol_system: params.protocol_system,
1726 component_ids: params.component_ids,
1727 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1728 };
1729
1730 let uri = format!(
1731 "{}/{}/component_tvl",
1732 self.url
1733 .to_string()
1734 .trim_end_matches('/'),
1735 TYCHO_SERVER_VERSION
1736 );
1737 debug!(%uri, "Sending get_component_tvl request to Tycho server");
1738 trace!(?request, "Sending request to Tycho server");
1739 let response = self
1740 .make_post_request(&request, &uri)
1741 .await?;
1742 trace!(?response, "Received response from Tycho server");
1743 let body = response
1744 .text()
1745 .await
1746 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1747 let dto_response =
1748 serde_json::from_str::<ComponentTvlRequestResponse>(&body).map_err(|err| {
1749 error!("Failed to parse component_tvl response: {:?}", &body);
1750 RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1751 })?;
1752 trace!(?dto_response, "Received component_tvl response from Tycho server");
1753 Ok(Page::new(
1754 dto_response.tvl,
1755 dto_response.pagination.total,
1756 dto_response.pagination.page,
1757 dto_response.pagination.page_size,
1758 ))
1759 }
1760
1761 async fn get_traced_entry_points(
1762 &self,
1763 params: TracedEntryPointsParams,
1764 ) -> Result<Page<TracedEntryPoints>, RPCError> {
1765 let request = TracedEntryPointRequestBody {
1766 chain: params.chain.into(),
1767 protocol_system: params.protocol_system,
1768 component_ids: params.component_ids,
1769 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1770 };
1771
1772 let uri = format!(
1773 "{}/{TYCHO_SERVER_VERSION}/traced_entry_points",
1774 self.url
1775 .to_string()
1776 .trim_end_matches('/')
1777 );
1778 debug!(%uri, "Sending traced_entry_points request to Tycho server");
1779 trace!(?request, "Sending request to Tycho server");
1780
1781 let response = self
1782 .make_post_request(&request, &uri)
1783 .await?;
1784
1785 trace!(?response, "Received response from Tycho server");
1786
1787 let body = response
1788 .text()
1789 .await
1790 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1791 let dto_response =
1792 serde_json::from_str::<TracedEntryPointRequestResponse>(&body).map_err(|err| {
1793 error!("Failed to parse traced_entry_points response: {:?}", &body);
1794 RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1795 })?;
1796 trace!(?dto_response, "Received traced_entry_points response from Tycho server");
1797 let data: TracedEntryPoints = dto_response
1798 .traced_entry_points
1799 .into_iter()
1800 .map(|(k, v)| {
1801 (
1802 k,
1803 v.into_iter()
1804 .map(|(ep, tr)| {
1805 (EntryPointWithTracingParams::from(ep), TracingResult::from(tr))
1806 })
1807 .collect(),
1808 )
1809 })
1810 .collect();
1811 Ok(Page::new(
1812 data,
1813 dto_response.pagination.total,
1814 dto_response.pagination.page,
1815 dto_response.pagination.page_size,
1816 ))
1817 }
1818
1819 #[allow(clippy::extra_unused_lifetimes)]
1820 async fn get_snapshots<'a>(
1821 &self,
1822 request: &SnapshotParameters<'a>,
1823 chunk_size: Option<usize>,
1824 concurrency: usize,
1825 ) -> Result<Snapshot, RPCError> {
1826 let component_ids: Vec<_> = request
1827 .components
1828 .keys()
1829 .cloned()
1830 .collect();
1831
1832 let component_tvl = if request.include_tvl && !component_ids.is_empty() {
1833 self.get_component_tvl_paginated(
1834 ComponentTvlPaginatedParams::new(request.chain, concurrency)
1835 .with_component_ids(component_ids.clone()),
1836 )
1837 .await?
1838 } else {
1839 HashMap::new()
1840 };
1841
1842 let version = VersionParam::at_block(request.chain.into(), request.block_number);
1843
1844 let mut protocol_states = if !component_ids.is_empty() {
1845 self.get_protocol_states_paginated(
1846 ProtocolStatesPaginatedParams::new(
1847 request.chain,
1848 request.protocol_system,
1849 concurrency,
1850 )
1851 .with_protocol_ids(component_ids.clone())
1852 .with_include_balances(request.include_balances)
1853 .with_version(version.clone()),
1854 )
1855 .await?
1856 .into_iter()
1857 .map(|state| (state.component_id.clone(), state))
1858 .collect()
1859 } else {
1860 HashMap::new()
1861 };
1862
1863 let states = request
1865 .components
1866 .values()
1867 .filter_map(|component| {
1868 if let Some(state) = protocol_states.remove(&component.id) {
1869 Some((
1870 component.id.clone(),
1871 ComponentWithState {
1872 state,
1873 component: component.clone(),
1874 component_tvl: component_tvl
1875 .get(&component.id)
1876 .cloned(),
1877 entrypoints: request
1878 .entrypoints
1879 .as_ref()
1880 .and_then(|map| map.get(&component.id))
1881 .cloned()
1882 .unwrap_or_default(),
1883 },
1884 ))
1885 } else if component_ids.contains(&component.id) {
1886 let component_id = &component.id;
1888 error!(?component_id, "Missing state for native component!");
1889 None
1890 } else {
1891 None
1892 }
1893 })
1894 .collect();
1895
1896 let vm_storage = if !request.contract_ids.is_empty() {
1897 let mut cp_params = ContractStatePaginatedParams::new(
1898 request.chain,
1899 request.protocol_system,
1900 concurrency,
1901 )
1902 .with_contract_ids(request.contract_ids.to_vec())
1903 .with_version(version.clone());
1904 if let Some(cs) = chunk_size {
1905 cp_params = cp_params.with_chunk_size(cs);
1906 }
1907 let contract_states = self
1908 .get_contract_state_paginated(cp_params)
1909 .await?
1910 .into_iter()
1911 .map(|acc| (acc.address.clone(), acc))
1912 .collect::<HashMap<_, _>>();
1913
1914 trace!(states=?&contract_states, "Retrieved ContractState");
1915
1916 let contract_address_to_components = request
1917 .components
1918 .iter()
1919 .filter_map(|(id, comp)| {
1920 if component_ids.contains(id) {
1921 Some(
1922 comp.contract_addresses
1923 .iter()
1924 .map(|address| (address.clone(), comp.id.clone())),
1925 )
1926 } else {
1927 None
1928 }
1929 })
1930 .flatten()
1931 .fold(HashMap::<Bytes, Vec<String>>::new(), |mut acc, (addr, c_id)| {
1932 acc.entry(addr).or_default().push(c_id);
1933 acc
1934 });
1935
1936 request
1937 .contract_ids
1938 .iter()
1939 .filter_map(|address| {
1940 if let Some(state) = contract_states.get(address) {
1941 Some((address.clone(), state.clone()))
1942 } else if let Some(ids) = contract_address_to_components.get(address) {
1943 error!(
1945 ?address,
1946 ?ids,
1947 "Component with lacking contract storage encountered!"
1948 );
1949 None
1950 } else {
1951 None
1952 }
1953 })
1954 .collect()
1955 } else {
1956 HashMap::new()
1957 };
1958
1959 Ok(Snapshot { states, vm_storage })
1960 }
1961}
1962
1963#[cfg(test)]
1964mod tests {
1965 use std::{
1966 collections::{HashMap, HashSet},
1967 str::FromStr,
1968 };
1969
1970 use mockito::Server;
1971 use rstest::rstest;
1972 use tycho_common::models::blockchain::AddressStorageLocation;
1973
1974 use super::*;
1975
1976 impl MockRPCClient {
1979 #[allow(clippy::too_many_arguments)]
1980 async fn test_get_protocol_states_paginated<T>(
1981 &self,
1982 chain: Chain,
1983 ids: &[T],
1984 protocol_system: &str,
1985 include_balances: bool,
1986 block_number: Option<u64>,
1987 chunk_size: usize,
1988 _concurrency: usize,
1989 ) -> Vec<(Chain, Vec<String>, String, bool, Option<u64>, PaginationParams)>
1990 where
1991 T: AsRef<str> + Clone + Send + Sync + 'static,
1992 {
1993 ids.chunks(chunk_size)
1994 .map(|chunk| {
1995 (
1996 chain,
1997 chunk
1998 .iter()
1999 .map(|id| id.as_ref().to_string())
2000 .collect(),
2001 protocol_system.to_string(),
2002 include_balances,
2003 block_number,
2004 PaginationParams { page: 0, page_size: chunk_size as i64 },
2005 )
2006 })
2007 .collect()
2008 }
2009 }
2010
2011 const GET_CONTRACT_STATE_RESP: &str = r#"
2012 {
2013 "accounts": [
2014 {
2015 "chain": "ethereum",
2016 "address": "0x0000000000000000000000000000000000000000",
2017 "title": "",
2018 "slots": {},
2019 "native_balance": "0x01f4",
2020 "token_balances": {},
2021 "code": "0x00",
2022 "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
2023 "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2024 "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2025 "creation_tx": null
2026 }
2027 ],
2028 "pagination": {
2029 "page": 0,
2030 "page_size": 20,
2031 "total": 10
2032 }
2033 }
2034 "#;
2035
2036 #[rstest]
2037 #[case::string_input(vec![
2038 "id1".to_string(),
2039 "id2".to_string()
2040 ])]
2041 #[tokio::test]
2042 async fn test_get_protocol_states_paginated<T>(#[case] ids: Vec<T>)
2043 where
2044 T: AsRef<str> + Clone + Send + Sync + 'static,
2045 {
2046 let mock_client = MockRPCClient::new();
2047
2048 let request_args = mock_client
2049 .test_get_protocol_states_paginated(
2050 Chain::Ethereum,
2051 &ids,
2052 "test_system",
2053 true,
2054 None,
2055 2,
2056 2,
2057 )
2058 .await;
2059
2060 assert_eq!(request_args.len(), 1);
2062 assert_eq!(request_args[0].1.len(), 2);
2063 }
2064
2065 #[tokio::test]
2066 async fn test_get_contract_state() {
2067 let mut server = Server::new_async().await;
2068 let server_resp = GET_CONTRACT_STATE_RESP;
2069 serde_json::from_str::<StateRequestResponse>(server_resp).expect("deserialize");
2071
2072 let mocked_server = server
2073 .mock("POST", "/v1/contract_state")
2074 .expect(1)
2075 .with_body(server_resp)
2076 .create_async()
2077 .await;
2078
2079 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2080 .expect("create client");
2081
2082 let accounts = client
2083 .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
2084 .await
2085 .expect("get state");
2086
2087 mocked_server.assert();
2088 assert_eq!(accounts.data().len(), 1);
2089 assert_eq!(accounts.data()[0].slots, HashMap::new());
2090 assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
2091 assert_eq!(accounts.data()[0].code, [0].to_vec());
2092 assert_eq!(
2093 accounts.data()[0].code_hash,
2094 hex::decode("5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e")
2095 .unwrap()
2096 );
2097 }
2098
2099 #[tokio::test]
2100 async fn test_get_protocol_components() {
2101 let mut server = Server::new_async().await;
2102 let server_resp = r#"
2103 {
2104 "protocol_components": [
2105 {
2106 "id": "State1",
2107 "protocol_system": "ambient",
2108 "protocol_type_name": "Pool",
2109 "chain": "ethereum",
2110 "tokens": [
2111 "0x0000000000000000000000000000000000000000",
2112 "0x0000000000000000000000000000000000000001"
2113 ],
2114 "contract_ids": [
2115 "0x0000000000000000000000000000000000000000"
2116 ],
2117 "static_attributes": {
2118 "attribute_1": "0x00000000000003e8"
2119 },
2120 "change": "Creation",
2121 "creation_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2122 "created_at": "2022-01-01T00:00:00"
2123 }
2124 ],
2125 "pagination": {
2126 "page": 0,
2127 "page_size": 20,
2128 "total": 10
2129 }
2130 }
2131 "#;
2132 serde_json::from_str::<ProtocolComponentRequestResponse>(server_resp).expect("deserialize");
2134
2135 let mocked_server = server
2136 .mock("POST", "/v1/protocol_components")
2137 .expect(1)
2138 .with_body(server_resp)
2139 .create_async()
2140 .await;
2141
2142 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2143 .expect("create client");
2144
2145 let components = client
2146 .get_protocol_components(ProtocolComponentsParams::new(Chain::Ethereum, ""))
2147 .await
2148 .expect("get state");
2149
2150 mocked_server.assert();
2151 assert_eq!(components.data().len(), 1);
2152 assert_eq!(components.data()[0].id, "State1");
2153 assert_eq!(components.data()[0].protocol_system, "ambient");
2154 assert_eq!(components.data()[0].protocol_type_name, "Pool");
2155 assert_eq!(components.data()[0].tokens.len(), 2);
2156 let expected_attributes =
2157 [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
2158 .iter()
2159 .cloned()
2160 .collect::<HashMap<String, Bytes>>();
2161 assert_eq!(components.data()[0].static_attributes, expected_attributes);
2162 }
2163
2164 #[tokio::test]
2165 async fn test_get_protocol_states() {
2166 let mut server = Server::new_async().await;
2167 let server_resp = r#"
2168 {
2169 "states": [
2170 {
2171 "component_id": "State1",
2172 "attributes": {
2173 "attribute_1": "0x00000000000003e8"
2174 },
2175 "balances": {
2176 "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
2177 }
2178 }
2179 ],
2180 "pagination": {
2181 "page": 0,
2182 "page_size": 20,
2183 "total": 10
2184 }
2185 }
2186 "#;
2187 serde_json::from_str::<ProtocolStateRequestResponse>(server_resp).expect("deserialize");
2189
2190 let mocked_server = server
2191 .mock("POST", "/v1/protocol_state")
2192 .expect(1)
2193 .with_body(server_resp)
2194 .create_async()
2195 .await;
2196 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2197 .expect("create client");
2198
2199 let states = client
2200 .get_protocol_states(
2201 ProtocolStatesParams::new(Chain::Ethereum, "").with_include_balances(true),
2202 )
2203 .await
2204 .expect("get state");
2205
2206 mocked_server.assert();
2207 assert_eq!(states.data().len(), 1);
2208 assert_eq!(states.data()[0].component_id, "State1");
2209 let expected_attributes =
2210 [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
2211 .iter()
2212 .cloned()
2213 .collect::<HashMap<String, Bytes>>();
2214 assert_eq!(states.data()[0].attributes, expected_attributes);
2215 let expected_balances = [(
2216 Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")
2217 .expect("Unsupported address format"),
2218 Bytes::from_str("0x01f4").unwrap(),
2219 )]
2220 .iter()
2221 .cloned()
2222 .collect::<HashMap<Bytes, Bytes>>();
2223 assert_eq!(states.data()[0].balances, expected_balances);
2224 }
2225
2226 #[tokio::test]
2227 async fn test_get_tokens() {
2228 let mut server = Server::new_async().await;
2229 let server_resp = r#"
2230 {
2231 "tokens": [
2232 {
2233 "chain": "ethereum",
2234 "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
2235 "symbol": "WETH",
2236 "decimals": 18,
2237 "tax": 0,
2238 "gas": [
2239 29962
2240 ],
2241 "quality": 100
2242 },
2243 {
2244 "chain": "ethereum",
2245 "address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
2246 "symbol": "USDC",
2247 "decimals": 6,
2248 "tax": 0,
2249 "gas": [
2250 40652
2251 ],
2252 "quality": 100
2253 }
2254 ],
2255 "pagination": {
2256 "page": 0,
2257 "page_size": 20,
2258 "total": 10
2259 }
2260 }
2261 "#;
2262 serde_json::from_str::<TokensRequestResponse>(server_resp).expect("deserialize");
2264
2265 let mocked_server = server
2266 .mock("POST", "/v1/tokens")
2267 .expect(1)
2268 .with_body(server_resp)
2269 .create_async()
2270 .await;
2271 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2272 .expect("create client");
2273
2274 let tokens = client
2275 .get_tokens(TokensParams::new(Chain::Ethereum))
2276 .await
2277 .expect("get tokens");
2278
2279 let expected = vec![
2280 Token {
2281 chain: tycho_common::models::Chain::Ethereum,
2282 address: Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap(),
2283 symbol: "WETH".to_string(),
2284 decimals: 18,
2285 tax: 0,
2286 gas: vec![Some(29962)],
2287 quality: 100,
2288 },
2289 Token {
2290 chain: tycho_common::models::Chain::Ethereum,
2291 address: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
2292 symbol: "USDC".to_string(),
2293 decimals: 6,
2294 tax: 0,
2295 gas: vec![Some(40652)],
2296 quality: 100,
2297 },
2298 ];
2299
2300 mocked_server.assert();
2301 assert_eq!(*tokens.data(), expected);
2302 }
2303
2304 #[rstest]
2305 #[case::with_dci(Some(vec!["system2"]), vec!["system2"])]
2306 #[case::backward_compat(None, vec![])]
2307 #[tokio::test]
2308 async fn test_get_protocol_systems(
2309 #[case] dci_protocols: Option<Vec<&str>>,
2310 #[case] expected_dci: Vec<&str>,
2311 ) {
2312 use serde_json::json;
2313
2314 let mut json_value = json!({
2315 "protocol_systems": ["system1", "system2"],
2316 "pagination": { "page": 0, "page_size": 20, "total": 2 }
2317 });
2318 if let Some(dci) = dci_protocols {
2319 json_value["dci_protocols"] = json!(dci);
2320 }
2321 let server_resp = serde_json::to_string(&json_value).unwrap();
2322
2323 let mut server = Server::new_async().await;
2324 let mocked_server = server
2325 .mock("POST", "/v1/protocol_systems")
2326 .expect(1)
2327 .with_body(&server_resp)
2328 .create_async()
2329 .await;
2330 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2331 .expect("create client");
2332
2333 let response = client
2334 .get_protocol_systems(ProtocolSystemsParams::new(Chain::Ethereum))
2335 .await
2336 .expect("get protocol systems");
2337
2338 mocked_server.assert();
2339 assert_eq!(response.data().protocol_systems(), ["system1", "system2"]);
2340 assert_eq!(response.data().dci_protocols(), expected_dci.as_slice());
2341 }
2342
2343 #[tokio::test]
2344 async fn test_get_component_tvl() {
2345 let mut server = Server::new_async().await;
2346 let server_resp = r#"
2347 {
2348 "tvl": {
2349 "component1": 100.0
2350 },
2351 "pagination": {
2352 "page": 0,
2353 "page_size": 20,
2354 "total": 10
2355 }
2356 }
2357 "#;
2358 serde_json::from_str::<ComponentTvlRequestResponse>(server_resp).expect("deserialize");
2360
2361 let mocked_server = server
2362 .mock("POST", "/v1/component_tvl")
2363 .expect(1)
2364 .with_body(server_resp)
2365 .create_async()
2366 .await;
2367 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2368 .expect("create client");
2369
2370 let component_tvl = client
2371 .get_component_tvl(ComponentTvlParams::new(Chain::Ethereum))
2372 .await
2373 .expect("get component tvl");
2374
2375 mocked_server.assert();
2376 assert_eq!(component_tvl.data().get("component1"), Some(&100.0));
2377 }
2378
2379 #[tokio::test]
2380 async fn test_get_traced_entry_points() {
2381 let mut server = Server::new_async().await;
2382 let server_resp = r#"
2383 {
2384 "traced_entry_points": {
2385 "component_1": [
2386 [
2387 {
2388 "entry_point": {
2389 "external_id": "entrypoint_a",
2390 "target": "0x0000000000000000000000000000000000000001",
2391 "signature": "sig()"
2392 },
2393 "params": {
2394 "method": "rpctracer",
2395 "caller": "0x000000000000000000000000000000000000000a",
2396 "calldata": "0x000000000000000000000000000000000000000b"
2397 }
2398 },
2399 {
2400 "retriggers": [
2401 [
2402 "0x00000000000000000000000000000000000000aa",
2403 {"key": "0x0000000000000000000000000000000000000aaa", "offset": 12}
2404 ]
2405 ],
2406 "accessed_slots": {
2407 "0x0000000000000000000000000000000000aaaa": [
2408 "0x0000000000000000000000000000000000aaaa"
2409 ]
2410 }
2411 }
2412 ]
2413 ]
2414 },
2415 "pagination": {
2416 "page": 0,
2417 "page_size": 20,
2418 "total": 1
2419 }
2420 }
2421 "#;
2422 serde_json::from_str::<TracedEntryPointRequestResponse>(server_resp).expect("deserialize");
2424
2425 let mocked_server = server
2426 .mock("POST", "/v1/traced_entry_points")
2427 .expect(1)
2428 .with_body(server_resp)
2429 .create_async()
2430 .await;
2431 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2432 .expect("create client");
2433
2434 let entrypoints = client
2435 .get_traced_entry_points(TracedEntryPointsParams::new(Chain::Ethereum, ""))
2436 .await
2437 .expect("get traced entry points");
2438
2439 mocked_server.assert();
2440 assert_eq!(entrypoints.data().len(), 1);
2441 let comp1_entrypoints = entrypoints
2442 .data()
2443 .get("component_1")
2444 .expect("component_1 entrypoints should exist");
2445 assert_eq!(comp1_entrypoints.len(), 1);
2446
2447 let (entrypoint, trace_result) = &comp1_entrypoints[0];
2448 assert_eq!(entrypoint.entry_point.external_id, "entrypoint_a");
2449 assert_eq!(
2450 entrypoint.entry_point.target,
2451 Bytes::from_str("0x0000000000000000000000000000000000000001").unwrap()
2452 );
2453 assert_eq!(entrypoint.entry_point.signature, "sig()");
2454 let tycho_common::models::blockchain::TracingParams::RPCTracer(rpc_params) =
2455 &entrypoint.params;
2456 assert_eq!(
2457 rpc_params.caller,
2458 Some(Bytes::from("0x000000000000000000000000000000000000000a"))
2459 );
2460 assert_eq!(rpc_params.calldata, Bytes::from("0x000000000000000000000000000000000000000b"));
2461
2462 assert_eq!(
2463 trace_result.retriggers,
2464 HashSet::from([(
2465 Bytes::from("0x00000000000000000000000000000000000000aa"),
2466 AddressStorageLocation::new(
2467 Bytes::from("0x0000000000000000000000000000000000000aaa"),
2468 12
2469 )
2470 )])
2471 );
2472 assert_eq!(trace_result.accessed_slots.len(), 1);
2473 assert_eq!(
2474 trace_result.accessed_slots,
2475 HashMap::from([(
2476 Bytes::from("0x0000000000000000000000000000000000aaaa"),
2477 HashSet::from([Bytes::from("0x0000000000000000000000000000000000aaaa")])
2478 )])
2479 );
2480 }
2481
2482 #[tokio::test]
2483 async fn test_parse_retry_value_numeric() {
2484 let result = parse_retry_value("60");
2485 assert!(result.is_some());
2486
2487 let expected_time = SystemTime::now() + Duration::from_secs(60);
2488 let actual_time = result.unwrap();
2489
2490 let diff = if actual_time > expected_time {
2492 actual_time
2493 .duration_since(expected_time)
2494 .unwrap()
2495 } else {
2496 expected_time
2497 .duration_since(actual_time)
2498 .unwrap()
2499 };
2500 assert!(diff < Duration::from_secs(1), "Time difference too large: {:?}", diff);
2501 }
2502
2503 #[tokio::test]
2504 async fn test_parse_retry_value_rfc2822() {
2505 let rfc2822_date = "Sat, 01 Jan 2030 12:00:00 +0000";
2507 let result = parse_retry_value(rfc2822_date);
2508 assert!(result.is_some());
2509
2510 let parsed_time = result.unwrap();
2511 assert!(parsed_time > SystemTime::now());
2512 }
2513
2514 #[tokio::test]
2515 async fn test_parse_retry_value_invalid_formats() {
2516 assert!(parse_retry_value("invalid").is_none());
2518 assert!(parse_retry_value("").is_none());
2519 assert!(parse_retry_value("not_a_number").is_none());
2520 assert!(parse_retry_value("Mon, 32 Jan 2030 25:00:00 +0000").is_none());
2521 }
2523
2524 #[tokio::test]
2525 async fn test_parse_retry_value_zero_seconds() {
2526 let result = parse_retry_value("0");
2527 assert!(result.is_some());
2528
2529 let expected_time = SystemTime::now();
2530 let actual_time = result.unwrap();
2531
2532 let diff = if actual_time > expected_time {
2534 actual_time
2535 .duration_since(expected_time)
2536 .unwrap()
2537 } else {
2538 expected_time
2539 .duration_since(actual_time)
2540 .unwrap()
2541 };
2542 assert!(diff < Duration::from_secs(1));
2543 }
2544
2545 #[tokio::test]
2546 async fn test_error_for_response_rate_limited() {
2547 let mut server = Server::new_async().await;
2548 let mock = server
2549 .mock("GET", "/test")
2550 .with_status(429)
2551 .with_header("Retry-After", "60")
2552 .create_async()
2553 .await;
2554
2555 let client = reqwest::Client::new();
2556 let response = client
2557 .get(format!("{}/test", server.url()))
2558 .send()
2559 .await
2560 .unwrap();
2561
2562 let http_client =
2563 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2564 .unwrap()
2565 .with_test_backoff_policy();
2566 let result = http_client
2567 .error_for_response(response)
2568 .await;
2569
2570 mock.assert();
2571 assert!(matches!(result, Err(RPCError::RateLimited(_))));
2572 if let Err(RPCError::RateLimited(retry_after)) = result {
2573 assert!(retry_after.is_some());
2574 }
2575 }
2576
2577 #[tokio::test]
2578 async fn test_error_for_response_rate_limited_no_header() {
2579 let mut server = Server::new_async().await;
2580 let mock = server
2581 .mock("GET", "/test")
2582 .with_status(429)
2583 .create_async()
2584 .await;
2585
2586 let client = reqwest::Client::new();
2587 let response = client
2588 .get(format!("{}/test", server.url()))
2589 .send()
2590 .await
2591 .unwrap();
2592
2593 let http_client =
2594 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2595 .unwrap()
2596 .with_test_backoff_policy();
2597 let result = http_client
2598 .error_for_response(response)
2599 .await;
2600
2601 mock.assert();
2602 assert!(matches!(result, Err(RPCError::RateLimited(None))));
2603 }
2604
2605 #[tokio::test]
2606 async fn test_error_for_response_server_errors() {
2607 let test_cases =
2608 vec![(502, "Bad Gateway"), (503, "Service Unavailable"), (504, "Gateway Timeout")];
2609
2610 for (status_code, expected_body) in test_cases {
2611 let mut server = Server::new_async().await;
2612 let mock = server
2613 .mock("GET", "/test")
2614 .with_status(status_code)
2615 .with_body(expected_body)
2616 .create_async()
2617 .await;
2618
2619 let client = reqwest::Client::new();
2620 let response = client
2621 .get(format!("{}/test", server.url()))
2622 .send()
2623 .await
2624 .unwrap();
2625
2626 let http_client =
2627 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2628 .unwrap()
2629 .with_test_backoff_policy();
2630 let result = http_client
2631 .error_for_response(response)
2632 .await;
2633
2634 mock.assert();
2635 assert!(matches!(result, Err(RPCError::ServerUnreachable(_))));
2636 if let Err(RPCError::ServerUnreachable(body)) = result {
2637 assert_eq!(body, expected_body);
2638 }
2639 }
2640 }
2641
2642 #[tokio::test]
2643 async fn test_error_for_response_success() {
2644 let mut server = Server::new_async().await;
2645 let mock = server
2646 .mock("GET", "/test")
2647 .with_status(200)
2648 .with_body("success")
2649 .create_async()
2650 .await;
2651
2652 let client = reqwest::Client::new();
2653 let response = client
2654 .get(format!("{}/test", server.url()))
2655 .send()
2656 .await
2657 .unwrap();
2658
2659 let http_client =
2660 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2661 .unwrap()
2662 .with_test_backoff_policy();
2663 let result = http_client
2664 .error_for_response(response)
2665 .await;
2666
2667 mock.assert();
2668 assert!(result.is_ok());
2669
2670 let response = result.unwrap();
2671 assert_eq!(response.status(), 200);
2672 }
2673
2674 #[tokio::test]
2675 async fn test_handle_error_for_backoff_server_unreachable() {
2676 let http_client =
2677 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2678 .unwrap()
2679 .with_test_backoff_policy();
2680 let error = RPCError::ServerUnreachable("Service down".to_string());
2681
2682 let backoff_error = http_client
2683 .handle_error_for_backoff(error)
2684 .await;
2685
2686 match backoff_error {
2687 backoff::Error::Transient { err: RPCError::ServerUnreachable(msg), retry_after } => {
2688 assert_eq!(msg, "Service down");
2689 assert_eq!(retry_after, Some(Duration::from_millis(50))); }
2691 _ => panic!("Expected transient error for ServerUnreachable"),
2692 }
2693 }
2694
2695 #[tokio::test]
2696 async fn test_handle_error_for_backoff_rate_limited_with_retry_after() {
2697 let http_client =
2698 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2699 .unwrap()
2700 .with_test_backoff_policy();
2701 let future_time = SystemTime::now() + Duration::from_secs(30);
2702 let error = RPCError::RateLimited(Some(future_time));
2703
2704 let backoff_error = http_client
2705 .handle_error_for_backoff(error)
2706 .await;
2707
2708 match backoff_error {
2709 backoff::Error::Transient { err: RPCError::RateLimited(retry_after), .. } => {
2710 assert_eq!(retry_after, Some(future_time));
2711 }
2712 _ => panic!("Expected transient error for RateLimited"),
2713 }
2714
2715 let stored_retry_after = http_client.retry_after.read().await;
2717 assert_eq!(*stored_retry_after, Some(future_time));
2718 }
2719
2720 #[tokio::test]
2721 async fn test_handle_error_for_backoff_rate_limited_no_retry_after() {
2722 let http_client =
2723 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2724 .unwrap()
2725 .with_test_backoff_policy();
2726 let error = RPCError::RateLimited(None);
2727
2728 let backoff_error = http_client
2729 .handle_error_for_backoff(error)
2730 .await;
2731
2732 match backoff_error {
2733 backoff::Error::Transient { err: RPCError::RateLimited(None), .. } => {
2734 }
2736 _ => panic!("Expected transient error for RateLimited without retry-after"),
2737 }
2738 }
2739
2740 #[tokio::test]
2741 async fn test_handle_error_for_backoff_other_errors() {
2742 let http_client =
2743 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2744 .unwrap()
2745 .with_test_backoff_policy();
2746 let error = RPCError::ParseResponse("Invalid JSON".to_string());
2747
2748 let backoff_error = http_client
2749 .handle_error_for_backoff(error)
2750 .await;
2751
2752 match backoff_error {
2753 backoff::Error::Permanent(RPCError::ParseResponse(msg)) => {
2754 assert_eq!(msg, "Invalid JSON");
2755 }
2756 _ => panic!("Expected permanent error for ParseResponse"),
2757 }
2758 }
2759
2760 #[tokio::test]
2761 async fn test_wait_until_retry_after_no_retry_time() {
2762 let http_client =
2763 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2764 .unwrap()
2765 .with_test_backoff_policy();
2766
2767 let start = std::time::Instant::now();
2768 http_client
2769 .wait_until_retry_after()
2770 .await;
2771 let elapsed = start.elapsed();
2772
2773 assert!(elapsed < Duration::from_millis(100));
2775 }
2776
2777 #[tokio::test]
2778 async fn test_wait_until_retry_after_past_time() {
2779 let http_client =
2780 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2781 .unwrap()
2782 .with_test_backoff_policy();
2783
2784 let past_time = SystemTime::now() - Duration::from_secs(10);
2786 *http_client.retry_after.write().await = Some(past_time);
2787
2788 let start = std::time::Instant::now();
2789 http_client
2790 .wait_until_retry_after()
2791 .await;
2792 let elapsed = start.elapsed();
2793
2794 assert!(elapsed < Duration::from_millis(100));
2796 }
2797
2798 #[tokio::test]
2799 async fn test_wait_until_retry_after_future_time() {
2800 let http_client =
2801 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2802 .unwrap()
2803 .with_test_backoff_policy();
2804
2805 let future_time = SystemTime::now() + Duration::from_millis(100);
2807 *http_client.retry_after.write().await = Some(future_time);
2808
2809 let start = std::time::Instant::now();
2810 http_client
2811 .wait_until_retry_after()
2812 .await;
2813 let elapsed = start.elapsed();
2814
2815 assert!(elapsed >= Duration::from_millis(80)); assert!(elapsed <= Duration::from_millis(200)); }
2819
2820 #[tokio::test]
2821 async fn test_make_post_request_success() {
2822 let mut server = Server::new_async().await;
2823 let server_resp = r#"{"success": true}"#;
2824
2825 let mock = server
2826 .mock("POST", "/test")
2827 .with_status(200)
2828 .with_body(server_resp)
2829 .create_async()
2830 .await;
2831
2832 let http_client =
2833 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2834 .unwrap()
2835 .with_test_backoff_policy();
2836 let request_body = serde_json::json!({"test": "data"});
2837 let uri = format!("{}/test", server.url());
2838
2839 let result = http_client
2840 .make_post_request(&request_body, &uri)
2841 .await;
2842
2843 mock.assert();
2844 assert!(result.is_ok());
2845
2846 let response = result.unwrap();
2847 assert_eq!(response.status(), 200);
2848 assert_eq!(response.text().await.unwrap(), server_resp);
2849 }
2850
2851 #[tokio::test]
2852 async fn test_make_post_request_retry_on_server_error() {
2853 let mut server = Server::new_async().await;
2854 let error_mock = server
2856 .mock("POST", "/test")
2857 .with_status(503)
2858 .with_body("Service Unavailable")
2859 .expect(1)
2860 .create_async()
2861 .await;
2862
2863 let success_mock = server
2864 .mock("POST", "/test")
2865 .with_status(200)
2866 .with_body(r#"{"success": true}"#)
2867 .expect(1)
2868 .create_async()
2869 .await;
2870
2871 let http_client =
2872 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2873 .unwrap()
2874 .with_test_backoff_policy();
2875 let request_body = serde_json::json!({"test": "data"});
2876 let uri = format!("{}/test", server.url());
2877
2878 let result = http_client
2879 .make_post_request(&request_body, &uri)
2880 .await;
2881
2882 error_mock.assert();
2883 success_mock.assert();
2884 assert!(result.is_ok());
2885 }
2886
2887 #[tokio::test]
2888 async fn test_make_post_request_respect_retry_after_header() {
2889 let mut server = Server::new_async().await;
2890
2891 let rate_limit_mock = server
2893 .mock("POST", "/test")
2894 .with_status(429)
2895 .with_header("Retry-After", "1") .expect(1)
2897 .create_async()
2898 .await;
2899
2900 let success_mock = server
2901 .mock("POST", "/test")
2902 .with_status(200)
2903 .with_body(r#"{"success": true}"#)
2904 .expect(1)
2905 .create_async()
2906 .await;
2907
2908 let http_client =
2909 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2910 .unwrap()
2911 .with_test_backoff_policy();
2912 let request_body = serde_json::json!({"test": "data"});
2913 let uri = format!("{}/test", server.url());
2914
2915 let start = std::time::Instant::now();
2916 let result = http_client
2917 .make_post_request(&request_body, &uri)
2918 .await;
2919 let elapsed = start.elapsed();
2920
2921 rate_limit_mock.assert();
2922 success_mock.assert();
2923 assert!(result.is_ok());
2924
2925 assert!(elapsed >= Duration::from_millis(900)); assert!(elapsed <= Duration::from_millis(2000)); }
2929
2930 #[tokio::test]
2931 async fn test_make_post_request_permanent_error() {
2932 let mut server = Server::new_async().await;
2933
2934 let mock = server
2935 .mock("POST", "/test")
2936 .with_status(400) .with_body("Bad Request")
2938 .expect(1)
2939 .create_async()
2940 .await;
2941
2942 let http_client =
2943 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2944 .unwrap()
2945 .with_test_backoff_policy();
2946 let request_body = serde_json::json!({"test": "data"});
2947 let uri = format!("{}/test", server.url());
2948
2949 let result = http_client
2950 .make_post_request(&request_body, &uri)
2951 .await;
2952
2953 mock.assert();
2954 assert!(result.is_ok()); let response = result.unwrap();
2957 assert_eq!(response.status(), 400);
2958 }
2959
2960 #[tokio::test]
2961 async fn test_concurrent_requests_with_different_retry_after() {
2962 let mut server = Server::new_async().await;
2963
2964 let rate_limit_mock_1 = server
2966 .mock("POST", "/test1")
2967 .with_status(429)
2968 .with_header("Retry-After", "1")
2969 .expect(1)
2970 .create_async()
2971 .await;
2972
2973 let rate_limit_mock_2 = server
2975 .mock("POST", "/test2")
2976 .with_status(429)
2977 .with_header("Retry-After", "2")
2978 .expect(1)
2979 .create_async()
2980 .await;
2981
2982 let success_mock_1 = server
2984 .mock("POST", "/test1")
2985 .with_status(200)
2986 .with_body(r#"{"result": "success1"}"#)
2987 .expect(1)
2988 .create_async()
2989 .await;
2990
2991 let success_mock_2 = server
2992 .mock("POST", "/test2")
2993 .with_status(200)
2994 .with_body(r#"{"result": "success2"}"#)
2995 .expect(1)
2996 .create_async()
2997 .await;
2998
2999 let http_client =
3000 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3001 .unwrap()
3002 .with_test_backoff_policy();
3003 let request_body = serde_json::json!({"test": "data"});
3004
3005 let uri1 = format!("{}/test1", server.url());
3006 let uri2 = format!("{}/test2", server.url());
3007
3008 let start = std::time::Instant::now();
3010 let (result1, result2) = tokio::join!(
3011 http_client.make_post_request(&request_body, &uri1),
3012 http_client.make_post_request(&request_body, &uri2)
3013 );
3014 let elapsed = start.elapsed();
3015
3016 rate_limit_mock_1.assert();
3017 rate_limit_mock_2.assert();
3018 success_mock_1.assert();
3019 success_mock_2.assert();
3020
3021 assert!(result1.is_ok());
3022 assert!(result2.is_ok());
3023
3024 assert!(elapsed >= Duration::from_millis(1800)); assert!(elapsed <= Duration::from_millis(3000)); let final_retry_after = http_client.retry_after.read().await;
3032 assert!(final_retry_after.is_some());
3033
3034 if let Some(retry_time) = *final_retry_after {
3036 let now = SystemTime::now();
3039 let diff = if retry_time > now {
3040 retry_time.duration_since(now).unwrap()
3041 } else {
3042 now.duration_since(retry_time).unwrap()
3043 };
3044
3045 assert!(diff <= Duration::from_secs(3), "Retry time difference too large: {:?}", diff);
3047 }
3048 }
3049
3050 #[tokio::test]
3051 async fn test_get_snapshots() {
3052 let mut server = Server::new_async().await;
3053
3054 let protocol_states_resp = r#"
3056 {
3057 "states": [
3058 {
3059 "component_id": "component1",
3060 "attributes": {
3061 "attribute_1": "0x00000000000003e8"
3062 },
3063 "balances": {
3064 "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
3065 }
3066 }
3067 ],
3068 "pagination": {
3069 "page": 0,
3070 "page_size": 100,
3071 "total": 1
3072 }
3073 }
3074 "#;
3075
3076 let contract_state_resp = r#"
3078 {
3079 "accounts": [
3080 {
3081 "chain": "ethereum",
3082 "address": "0x1111111111111111111111111111111111111111",
3083 "title": "",
3084 "slots": {},
3085 "native_balance": "0x01f4",
3086 "token_balances": {},
3087 "code": "0x00",
3088 "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
3089 "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
3090 "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
3091 "creation_tx": null
3092 }
3093 ],
3094 "pagination": {
3095 "page": 0,
3096 "page_size": 100,
3097 "total": 1
3098 }
3099 }
3100 "#;
3101
3102 let tvl_resp = r#"
3104 {
3105 "tvl": {
3106 "component1": 1000000.0
3107 },
3108 "pagination": {
3109 "page": 0,
3110 "page_size": 100,
3111 "total": 1
3112 }
3113 }
3114 "#;
3115
3116 let protocol_states_mock = server
3117 .mock("POST", "/v1/protocol_state")
3118 .expect(1)
3119 .with_body(protocol_states_resp)
3120 .create_async()
3121 .await;
3122
3123 let contract_state_mock = server
3124 .mock("POST", "/v1/contract_state")
3125 .expect(1)
3126 .with_body(contract_state_resp)
3127 .create_async()
3128 .await;
3129
3130 let tvl_mock = server
3131 .mock("POST", "/v1/component_tvl")
3132 .expect(1)
3133 .with_body(tvl_resp)
3134 .create_async()
3135 .await;
3136
3137 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3138 .expect("create client");
3139
3140 let component = tycho_common::models::protocol::ProtocolComponent {
3141 id: "component1".to_string(),
3142 protocol_system: "test_protocol".to_string(),
3143 protocol_type_name: "test_type".to_string(),
3144 chain: Chain::Ethereum,
3145 tokens: vec![],
3146 contract_addresses: vec![
3147 Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()
3148 ],
3149 static_attributes: HashMap::new(),
3150 change: tycho_common::models::ChangeType::Creation,
3151 creation_tx: Bytes::from_str(
3152 "0x0000000000000000000000000000000000000000000000000000000000000000",
3153 )
3154 .unwrap(),
3155 created_at: chrono::Utc::now().naive_utc(),
3156 };
3157
3158 let mut components = HashMap::new();
3159 components.insert("component1".to_string(), component);
3160
3161 let contract_ids =
3162 vec![Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()];
3163
3164 let request = SnapshotParameters::new(
3165 Chain::Ethereum,
3166 "test_protocol",
3167 &components,
3168 &contract_ids,
3169 12345,
3170 );
3171
3172 let response = client
3173 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3174 .await
3175 .expect("get snapshots");
3176
3177 protocol_states_mock.assert();
3179 contract_state_mock.assert();
3180 tvl_mock.assert();
3181
3182 assert_eq!(response.states.len(), 1);
3184 assert!(response
3185 .states
3186 .contains_key("component1"));
3187
3188 let component_state = response
3190 .states
3191 .get("component1")
3192 .unwrap();
3193 assert_eq!(component_state.component_tvl, Some(1000000.0));
3194
3195 assert_eq!(response.vm_storage.len(), 1);
3197 let contract_addr = Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap();
3198 assert!(response
3199 .vm_storage
3200 .contains_key(&contract_addr));
3201 }
3202
3203 #[tokio::test]
3204 async fn test_get_snapshots_empty_components() {
3205 let server = Server::new_async().await;
3206 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3207 .expect("create client");
3208
3209 let components = HashMap::new();
3210 let contract_ids = vec![];
3211
3212 let request = SnapshotParameters::new(
3213 Chain::Ethereum,
3214 "test_protocol",
3215 &components,
3216 &contract_ids,
3217 12345,
3218 );
3219
3220 let response = client
3221 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3222 .await
3223 .expect("get snapshots");
3224
3225 assert!(response.states.is_empty());
3227 assert!(response.vm_storage.is_empty());
3228 }
3229
3230 #[tokio::test]
3231 async fn test_get_snapshots_without_tvl() {
3232 let mut server = Server::new_async().await;
3233
3234 let protocol_states_resp = r#"
3235 {
3236 "states": [
3237 {
3238 "component_id": "component1",
3239 "attributes": {},
3240 "balances": {}
3241 }
3242 ],
3243 "pagination": {
3244 "page": 0,
3245 "page_size": 100,
3246 "total": 1
3247 }
3248 }
3249 "#;
3250
3251 let protocol_states_mock = server
3252 .mock("POST", "/v1/protocol_state")
3253 .expect(1)
3254 .with_body(protocol_states_resp)
3255 .create_async()
3256 .await;
3257
3258 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3259 .expect("create client");
3260
3261 let component = tycho_common::models::protocol::ProtocolComponent {
3263 id: "component1".to_string(),
3264 protocol_system: "test_protocol".to_string(),
3265 protocol_type_name: "test_type".to_string(),
3266 chain: Chain::Ethereum,
3267 tokens: vec![],
3268 contract_addresses: vec![],
3269 static_attributes: HashMap::new(),
3270 change: tycho_common::models::ChangeType::Creation,
3271 creation_tx: Bytes::from_str(
3272 "0x0000000000000000000000000000000000000000000000000000000000000000",
3273 )
3274 .unwrap(),
3275 created_at: chrono::Utc::now().naive_utc(),
3276 };
3277
3278 let mut components = HashMap::new();
3279 components.insert("component1".to_string(), component);
3280 let contract_ids = vec![];
3281
3282 let request = SnapshotParameters::new(
3283 Chain::Ethereum,
3284 "test_protocol",
3285 &components,
3286 &contract_ids,
3287 12345,
3288 )
3289 .include_balances(false)
3290 .include_tvl(false);
3291
3292 let response = client
3293 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3294 .await
3295 .expect("get snapshots");
3296
3297 protocol_states_mock.assert();
3299 assert_eq!(response.states.len(), 1);
3303 let component_state = response
3305 .states
3306 .get("component1")
3307 .unwrap();
3308 assert_eq!(component_state.component_tvl, None);
3309 }
3310
3311 #[tokio::test]
3312 async fn test_compression_enabled() {
3313 let mut server = Server::new_async().await;
3314 let server_resp = GET_CONTRACT_STATE_RESP;
3315
3316 let compressed_body =
3318 zstd::encode_all(server_resp.as_bytes(), 0).expect("compression failed");
3319
3320 let mocked_server = server
3321 .mock("POST", "/v1/contract_state")
3322 .expect(1)
3323 .with_header("Content-Encoding", "zstd")
3324 .with_body(compressed_body)
3325 .create_async()
3326 .await;
3327
3328 let client = HttpRPCClient::new(
3330 server.url().as_str(),
3331 HttpRPCClientOptions::new().with_compression(true),
3332 )
3333 .expect("create client");
3334
3335 let response = client
3336 .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
3337 .await
3338 .expect("get state");
3339 let accounts = response;
3340
3341 mocked_server.assert();
3342 assert_eq!(accounts.data().len(), 1);
3343 assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
3344 }
3345
3346 #[tokio::test]
3347 async fn test_compression_disabled() {
3348 let mut server = Server::new_async().await;
3349 let server_resp = GET_CONTRACT_STATE_RESP;
3350
3351 let mocked_server = server
3354 .mock("POST", "/v1/contract_state")
3355 .expect(1)
3356 .match_header("Accept-Encoding", mockito::Matcher::Missing)
3357 .with_status(200)
3358 .with_body(server_resp)
3359 .create_async()
3360 .await;
3361
3362 let client = HttpRPCClient::new(
3364 server.url().as_str(),
3365 HttpRPCClientOptions::new().with_compression(false),
3366 )
3367 .expect("create client");
3368
3369 let response = client
3370 .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
3371 .await
3372 .expect("get state");
3373 let accounts = response;
3374
3375 mocked_server.assert();
3377 assert_eq!(accounts.data().len(), 1);
3378 assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
3379 }
3380
3381 #[rstest]
3382 #[case::single_page(2, 1000)]
3383 #[case::multiple_pages_within_concurrency(10, 2)]
3384 #[case::exceeds_concurrency_limit(60, 2)]
3385 #[tokio::test]
3386 async fn test_get_all_tokens_pagination_and_concurrency(
3387 #[case] total_tokens: usize,
3388 #[case] page_size: usize,
3389 ) {
3390 use std::sync::atomic::{AtomicUsize, Ordering};
3391
3392 let allowed_concurrency = 10;
3393
3394 let concurrent_requests = Arc::new(AtomicUsize::new(0));
3395 let max_concurrent = Arc::new(AtomicUsize::new(0));
3396
3397 let mut server = Server::new_async().await;
3398
3399 let total_pages = (total_tokens as f64 / page_size as f64).ceil() as i64;
3400
3401 for page in 0..total_pages {
3403 let concurrent = concurrent_requests.clone();
3404 let max_conc = max_concurrent.clone();
3405
3406 let tokens_in_page = {
3407 let start_idx = (page as usize) * page_size;
3408 let end_idx = ((page as usize + 1) * page_size).min(total_tokens);
3409 (start_idx..end_idx)
3410 .map(|i| {
3411 format!(
3412 r#"{{
3413 "chain": "ethereum",
3414 "address": "0x{i:040x}",
3415 "symbol": "TOKEN_{i}",
3416 "decimals": 18,
3417 "tax": 0,
3418 "gas": [30000],
3419 "quality": 100
3420 }}"#
3421 )
3422 })
3423 .collect::<Vec<_>>()
3424 };
3425
3426 let tokens_json = tokens_in_page.join(",");
3427 let response = format!(
3428 r#"{{
3429 "tokens": [{tokens_json}],
3430 "pagination": {{
3431 "page": {page},
3432 "page_size": {page_size},
3433 "total": {total_tokens}
3434 }}
3435 }}"#,
3436 );
3437
3438 server
3439 .mock("POST", "/v1/tokens")
3440 .expect(1)
3441 .with_chunked_body(move |w| {
3442 let current = concurrent.fetch_add(1, Ordering::SeqCst);
3444 max_conc.fetch_max(current + 1, Ordering::SeqCst);
3445
3446 std::thread::sleep(Duration::from_millis(10));
3448
3449 concurrent.fetch_sub(1, Ordering::SeqCst);
3450
3451 w.write_all(response.as_bytes())
3452 })
3453 .create_async()
3454 .await;
3455 }
3456
3457 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3458 .expect("create client");
3459
3460 let tokens = client
3461 .get_all_tokens(
3462 AllTokensParams::new(Chain::Ethereum, allowed_concurrency)
3463 .with_chunk_size(page_size),
3464 )
3465 .await
3466 .expect("get all tokens");
3467
3468 let max = max_concurrent.load(Ordering::SeqCst);
3470 let expected_max_concurrency = (total_pages as usize)
3471 .saturating_sub(1)
3472 .min(allowed_concurrency);
3473 assert!(
3474 max <= allowed_concurrency,
3475 "Expected max concurrent requests <= {allowed_concurrency}, got {max}"
3476 );
3477
3478 if total_pages > 1 && expected_max_concurrency > 1 {
3480 assert!(
3481 max > 0,
3482 "Expected some concurrent requests for multi-page response, got {max}"
3483 );
3484 }
3485
3486 assert_eq!(
3488 tokens.len(),
3489 total_tokens,
3490 "Expected {total_tokens} tokens, got {}",
3491 tokens.len()
3492 );
3493
3494 for (i, token) in tokens.iter().enumerate() {
3496 assert_eq!(token.symbol, format!("TOKEN_{i}"), "Token at index {i} has wrong symbol");
3497 }
3498 }
3499}