1use std::{
7 collections::HashMap,
8 sync::Arc,
9 time::{Duration, SystemTime},
10};
11
12use async_trait::async_trait;
13use backoff::{exponential::ExponentialBackoffBuilder, ExponentialBackoff};
14use futures03::future::try_join_all;
15#[cfg(test)]
16use mockall::automock;
17use reqwest::{header, Client, ClientBuilder, Response, StatusCode, Url};
18use serde::Serialize;
19use thiserror::Error;
20use time::{format_description::well_known::Rfc2822, OffsetDateTime};
21use tokio::{
22 sync::{RwLock, Semaphore},
23 time::sleep,
24};
25use tracing::{debug, error, instrument, trace, warn};
26use tycho_common::{
27 dto::{
28 ComponentTvlRequestBody, ComponentTvlRequestResponse, PaginationLimits, PaginationParams,
29 ProtocolComponentRequestResponse, ProtocolComponentsRequestBody, ProtocolStateRequestBody,
30 ProtocolStateRequestResponse, ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse,
31 StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
32 TracedEntryPointRequestBody, TracedEntryPointRequestResponse, VersionParam,
33 },
34 models::{
35 blockchain::{EntryPointWithTracingParams, TracedEntryPoints, TracingResult},
36 contract::Account,
37 protocol::{ProtocolComponent, ProtocolComponentState},
38 token::Token,
39 Chain, ComponentId,
40 },
41 Bytes,
42};
43
44#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct ProtocolSystems {
47 protocol_systems: Vec<String>,
48 dci_protocols: Vec<String>,
49}
50
51impl ProtocolSystems {
52 pub(crate) fn new(protocol_systems: Vec<String>, dci_protocols: Vec<String>) -> Self {
53 Self { protocol_systems, dci_protocols }
54 }
55
56 pub fn protocol_systems(&self) -> &[String] {
57 &self.protocol_systems
58 }
59
60 pub fn dci_protocols(&self) -> &[String] {
61 &self.dci_protocols
62 }
63}
64
65#[derive(Debug, Clone, PartialEq)]
67pub struct Page<T> {
68 data: T,
69 total: i64,
70 page: i64,
71 page_size: i64,
72}
73
74impl<T> Page<T> {
75 pub fn new(data: T, total: i64, page: i64, page_size: i64) -> Self {
76 Page { data, total, page, page_size }
77 }
78
79 pub fn data(&self) -> &T {
80 &self.data
81 }
82
83 pub fn into_data(self) -> T {
84 self.data
85 }
86
87 pub fn total(&self) -> i64 {
88 self.total
89 }
90
91 pub fn page(&self) -> i64 {
92 self.page
93 }
94
95 pub fn page_size(&self) -> i64 {
96 self.page_size
97 }
98}
99
100impl<T> Page<Vec<T>> {
101 pub fn len(&self) -> usize {
102 self.data.len()
103 }
104
105 pub fn is_empty(&self) -> bool {
106 self.data.is_empty()
107 }
108}
109
110impl<K, V, S: std::hash::BuildHasher> Page<HashMap<K, V, S>> {
111 pub fn len(&self) -> usize {
112 self.data.len()
113 }
114
115 pub fn is_empty(&self) -> bool {
116 self.data.is_empty()
117 }
118}
119
120impl<T: IntoIterator> IntoIterator for Page<T> {
121 type Item = T::Item;
122 type IntoIter = T::IntoIter;
123
124 fn into_iter(self) -> Self::IntoIter {
125 self.data.into_iter()
126 }
127}
128
129impl<'a, T> IntoIterator for &'a Page<T>
130where
131 &'a T: IntoIterator,
132{
133 type Item = <&'a T as IntoIterator>::Item;
134 type IntoIter = <&'a T as IntoIterator>::IntoIter;
135
136 fn into_iter(self) -> Self::IntoIter {
137 (&self.data).into_iter()
138 }
139}
140
141use crate::{
142 feed::synchronizer::{ComponentWithState, Snapshot},
143 TYCHO_SERVER_VERSION,
144};
145
146pub const RPC_CLIENT_CONCURRENCY: usize = 4;
148
149#[derive(Clone, PartialEq, Debug)]
151pub struct ContractStateParams {
152 chain: Chain,
153 protocol_system: String,
154 contract_ids: Option<Vec<Bytes>>,
155 version: VersionParam,
156 page: i64,
157 page_size: i64,
158}
159
160impl ContractStateParams {
161 pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
162 Self {
163 chain,
164 protocol_system: protocol_system.into(),
165 contract_ids: None,
166 version: VersionParam::default(),
167 page: 0,
168 page_size: StateRequestBody::MAX_PAGE_SIZE_COMPRESSED,
169 }
170 }
171
172 pub fn with_contract_ids(mut self, ids: Vec<Bytes>) -> Self {
173 self.contract_ids = Some(ids);
174 self
175 }
176
177 pub fn with_version(mut self, version: VersionParam) -> Self {
178 self.version = version;
179 self
180 }
181
182 pub fn with_block_number(mut self, block_number: u64) -> Self {
183 self.version = VersionParam::at_block(self.chain.into(), block_number);
184 self
185 }
186
187 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
188 self.page = page;
189 self.page_size = page_size;
190 self
191 }
192}
193
194#[derive(Clone, PartialEq, Debug)]
196pub struct ProtocolComponentsParams {
197 chain: Chain,
198 protocol_system: String,
199 component_ids: Option<Vec<ComponentId>>,
200 tvl_gt: Option<f64>,
201 page: i64,
202 page_size: i64,
203}
204
205impl ProtocolComponentsParams {
206 pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
207 Self {
208 chain,
209 protocol_system: protocol_system.into(),
210 component_ids: None,
211 tvl_gt: None,
212 page: 0,
213 page_size: ProtocolComponentsRequestBody::MAX_PAGE_SIZE_COMPRESSED,
214 }
215 }
216
217 pub fn with_component_ids(mut self, ids: Vec<ComponentId>) -> Self {
218 self.component_ids = Some(ids);
219 self
220 }
221
222 pub fn with_tvl_gt(mut self, tvl_gt: f64) -> Self {
223 self.tvl_gt = Some(tvl_gt);
224 self
225 }
226
227 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
228 self.page = page;
229 self.page_size = page_size;
230 self
231 }
232
233 #[cfg(test)]
234 pub(crate) fn component_ids(&self) -> Option<&Vec<ComponentId>> {
235 self.component_ids.as_ref()
236 }
237}
238
239#[derive(Clone, PartialEq, Debug)]
241pub struct ProtocolStatesParams {
242 chain: Chain,
243 protocol_system: String,
244 protocol_ids: Option<Vec<String>>,
245 include_balances: bool,
246 version: VersionParam,
247 page: i64,
248 page_size: i64,
249}
250
251impl ProtocolStatesParams {
252 pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
253 Self {
254 chain,
255 protocol_system: protocol_system.into(),
256 protocol_ids: None,
257 include_balances: false,
258 version: VersionParam::default(),
259 page: 0,
260 page_size: ProtocolStateRequestBody::MAX_PAGE_SIZE_COMPRESSED,
261 }
262 }
263
264 pub fn with_protocol_ids(mut self, ids: Vec<String>) -> Self {
265 self.protocol_ids = Some(ids);
266 self
267 }
268
269 pub fn with_include_balances(mut self, include_balances: bool) -> Self {
270 self.include_balances = include_balances;
271 self
272 }
273
274 pub fn with_version(mut self, version: VersionParam) -> Self {
275 self.version = version;
276 self
277 }
278
279 pub fn with_block_number(mut self, block_number: u64) -> Self {
280 self.version = VersionParam::at_block(self.chain.into(), block_number);
281 self
282 }
283
284 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
285 self.page = page;
286 self.page_size = page_size;
287 self
288 }
289}
290
291#[derive(Clone, PartialEq, Debug)]
293pub struct TokensParams {
294 chain: Chain,
295 min_quality: Option<i32>,
296 traded_n_days_ago: Option<u64>,
297 page: i64,
298 page_size: i64,
299}
300
301impl TokensParams {
302 pub fn new(chain: Chain) -> Self {
303 Self {
304 chain,
305 min_quality: None,
306 traded_n_days_ago: None,
307 page: 0,
308 page_size: TokensRequestBody::MAX_PAGE_SIZE_COMPRESSED,
309 }
310 }
311
312 pub fn with_min_quality(mut self, min_quality: i32) -> Self {
313 self.min_quality = Some(min_quality);
314 self
315 }
316
317 pub fn with_traded_n_days_ago(mut self, days: u64) -> Self {
318 self.traded_n_days_ago = Some(days);
319 self
320 }
321
322 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
323 self.page = page;
324 self.page_size = page_size;
325 self
326 }
327}
328
329#[derive(Clone, PartialEq, Debug)]
331pub struct ProtocolSystemsParams {
332 chain: Chain,
333 page: i64,
334 page_size: i64,
335}
336
337impl ProtocolSystemsParams {
338 pub fn new(chain: Chain) -> Self {
339 Self { chain, page: 0, page_size: ProtocolSystemsRequestBody::MAX_PAGE_SIZE_COMPRESSED }
340 }
341
342 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
343 self.page = page;
344 self.page_size = page_size;
345 self
346 }
347}
348
349#[derive(Clone, PartialEq, Debug)]
351pub struct ComponentTvlParams {
352 chain: Chain,
353 protocol_system: Option<String>,
354 component_ids: Option<Vec<String>>,
355 page: i64,
356 page_size: i64,
357}
358
359impl ComponentTvlParams {
360 pub fn new(chain: Chain) -> Self {
361 Self {
362 chain,
363 protocol_system: None,
364 component_ids: None,
365 page: 0,
366 page_size: ComponentTvlRequestBody::MAX_PAGE_SIZE_COMPRESSED,
367 }
368 }
369
370 pub fn with_protocol_system(mut self, protocol_system: impl Into<String>) -> Self {
371 self.protocol_system = Some(protocol_system.into());
372 self
373 }
374
375 pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
376 self.component_ids = Some(ids);
377 self
378 }
379
380 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
381 self.page = page;
382 self.page_size = page_size;
383 self
384 }
385}
386
387#[derive(Clone, PartialEq, Debug)]
389pub struct TracedEntryPointsParams {
390 chain: Chain,
391 protocol_system: String,
392 component_ids: Option<Vec<String>>,
393 page: i64,
394 page_size: i64,
395}
396
397impl TracedEntryPointsParams {
398 pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
399 Self {
400 chain,
401 protocol_system: protocol_system.into(),
402 component_ids: None,
403 page: 0,
404 page_size: TracedEntryPointRequestBody::MAX_PAGE_SIZE_COMPRESSED,
405 }
406 }
407
408 pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
409 self.component_ids = Some(ids);
410 self
411 }
412
413 pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
414 self.page = page;
415 self.page_size = page_size;
416 self
417 }
418}
419
420#[derive(Clone, PartialEq, Debug)]
422pub struct ProtocolComponentsPaginatedParams {
423 chain: Chain,
424 protocol_system: String,
425 component_ids: Option<Vec<ComponentId>>,
426 tvl_gt: Option<f64>,
427 chunk_size: Option<usize>,
428 concurrency: usize,
429}
430
431impl ProtocolComponentsPaginatedParams {
432 pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
433 Self {
434 chain,
435 protocol_system: protocol_system.into(),
436 component_ids: None,
437 tvl_gt: None,
438 chunk_size: None,
439 concurrency,
440 }
441 }
442
443 pub fn with_component_ids(mut self, ids: Vec<ComponentId>) -> Self {
444 self.component_ids = Some(ids);
445 self
446 }
447
448 pub fn with_tvl_gt(mut self, tvl_gt: f64) -> Self {
449 self.tvl_gt = Some(tvl_gt);
450 self
451 }
452
453 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
454 self.chunk_size = Some(chunk_size);
455 self
456 }
457}
458
459#[derive(Clone, PartialEq, Debug)]
461pub struct TracedEntryPointsPaginatedParams {
462 chain: Chain,
463 protocol_system: String,
464 component_ids: Vec<String>,
465 chunk_size: Option<usize>,
466 concurrency: usize,
467}
468
469impl TracedEntryPointsPaginatedParams {
470 pub fn new(
471 chain: Chain,
472 protocol_system: impl Into<String>,
473 component_ids: Vec<String>,
474 concurrency: usize,
475 ) -> Self {
476 Self {
477 chain,
478 protocol_system: protocol_system.into(),
479 component_ids,
480 chunk_size: None,
481 concurrency,
482 }
483 }
484
485 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
486 self.chunk_size = Some(chunk_size);
487 self
488 }
489}
490
491#[derive(Clone, PartialEq, Debug)]
493pub struct ProtocolStatesPaginatedParams {
494 chain: Chain,
495 protocol_system: String,
496 protocol_ids: Vec<String>,
497 include_balances: bool,
498 version: VersionParam,
499 chunk_size: Option<usize>,
500 concurrency: usize,
501}
502
503impl ProtocolStatesPaginatedParams {
504 pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
505 Self {
506 chain,
507 protocol_system: protocol_system.into(),
508 protocol_ids: Vec::new(),
509 include_balances: true,
510 version: VersionParam::default(),
511 chunk_size: None,
512 concurrency,
513 }
514 }
515
516 pub fn with_protocol_ids(mut self, ids: Vec<String>) -> Self {
517 self.protocol_ids = ids;
518 self
519 }
520
521 pub fn with_include_balances(mut self, include_balances: bool) -> Self {
522 self.include_balances = include_balances;
523 self
524 }
525
526 pub fn with_version(mut self, version: VersionParam) -> Self {
527 self.version = version;
528 self
529 }
530
531 pub fn with_block_number(mut self, block_number: u64) -> Self {
532 self.version = VersionParam::at_block(self.chain.into(), block_number);
533 self
534 }
535
536 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
537 self.chunk_size = Some(chunk_size);
538 self
539 }
540}
541
542#[derive(Clone, PartialEq, Debug)]
544pub struct AllTokensParams {
545 chain: Chain,
546 min_quality: Option<i32>,
547 traded_n_days_ago: Option<u64>,
548 chunk_size: Option<usize>,
549 concurrency: usize,
550}
551
552impl AllTokensParams {
553 pub fn new(chain: Chain, concurrency: usize) -> Self {
554 Self { chain, min_quality: None, traded_n_days_ago: None, chunk_size: None, concurrency }
555 }
556
557 pub fn with_min_quality(mut self, min_quality: i32) -> Self {
558 self.min_quality = Some(min_quality);
559 self
560 }
561
562 pub fn with_traded_n_days_ago(mut self, days: u64) -> Self {
563 self.traded_n_days_ago = Some(days);
564 self
565 }
566
567 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
568 self.chunk_size = Some(chunk_size);
569 self
570 }
571}
572
573#[derive(Clone, PartialEq, Debug)]
575pub struct ComponentTvlPaginatedParams {
576 chain: Chain,
577 protocol_system: Option<String>,
578 component_ids: Option<Vec<String>>,
579 chunk_size: Option<usize>,
580 concurrency: usize,
581}
582
583impl ComponentTvlPaginatedParams {
584 pub fn new(chain: Chain, concurrency: usize) -> Self {
585 Self { chain, protocol_system: None, component_ids: None, chunk_size: None, concurrency }
586 }
587
588 pub fn with_protocol_system(mut self, protocol_system: impl Into<String>) -> Self {
589 self.protocol_system = Some(protocol_system.into());
590 self
591 }
592
593 pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
594 self.component_ids = Some(ids);
595 self
596 }
597
598 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
599 self.chunk_size = Some(chunk_size);
600 self
601 }
602}
603
604#[derive(Clone, PartialEq, Debug)]
606pub struct ContractStatePaginatedParams {
607 chain: Chain,
608 protocol_system: String,
609 contract_ids: Vec<Bytes>,
610 version: VersionParam,
611 chunk_size: Option<usize>,
612 concurrency: usize,
613}
614
615impl ContractStatePaginatedParams {
616 pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
617 Self {
618 chain,
619 protocol_system: protocol_system.into(),
620 contract_ids: Vec::new(),
621 version: VersionParam::default(),
622 chunk_size: None,
623 concurrency,
624 }
625 }
626
627 pub fn with_contract_ids(mut self, ids: Vec<Bytes>) -> Self {
628 self.contract_ids = ids;
629 self
630 }
631
632 pub fn with_version(mut self, version: VersionParam) -> Self {
633 self.version = version;
634 self
635 }
636
637 pub fn with_block_number(mut self, block_number: u64) -> Self {
638 self.version = VersionParam::at_block(self.chain.into(), block_number);
639 self
640 }
641
642 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
643 self.chunk_size = Some(chunk_size);
644 self
645 }
646}
647
648#[derive(Clone, Debug, PartialEq)]
653pub struct SnapshotParameters<'a> {
654 pub chain: Chain,
656 pub protocol_system: &'a str,
658 pub components: &'a HashMap<ComponentId, ProtocolComponent>,
660 pub entrypoints: Option<&'a TracedEntryPoints>,
662 pub contract_ids: &'a [Bytes],
664 pub block_number: u64,
666 pub include_balances: bool,
668 pub include_tvl: bool,
670}
671
672impl<'a> SnapshotParameters<'a> {
673 pub fn new(
674 chain: Chain,
675 protocol_system: &'a str,
676 components: &'a HashMap<ComponentId, ProtocolComponent>,
677 contract_ids: &'a [Bytes],
678 block_number: u64,
679 ) -> Self {
680 Self {
681 chain,
682 protocol_system,
683 components,
684 entrypoints: None,
685 contract_ids,
686 block_number,
687 include_balances: true,
688 include_tvl: true,
689 }
690 }
691
692 pub fn include_balances(mut self, include_balances: bool) -> Self {
694 self.include_balances = include_balances;
695 self
696 }
697
698 pub fn include_tvl(mut self, include_tvl: bool) -> Self {
700 self.include_tvl = include_tvl;
701 self
702 }
703
704 pub fn entrypoints(mut self, entrypoints: &'a TracedEntryPoints) -> Self {
705 self.entrypoints = Some(entrypoints);
706 self
707 }
708}
709
710#[derive(Error, Debug)]
711pub enum RPCError {
712 #[error("Failed to parse URL: {0}. Error: {1}")]
714 UrlParsing(String, String),
715
716 #[error("Failed to format request: {0}")]
718 FormatRequest(String),
719
720 #[error("Unexpected HTTP client error: {0}")]
722 HttpClient(String, #[source] reqwest::Error),
723
724 #[error("Failed to parse response: {0}")]
726 ParseResponse(String),
727
728 #[error("Snapshot block is stale: {0}")]
730 StaleBlock(String),
731
732 #[error("Unknown extractor: {0}")]
734 UnknownExtractor(String),
735
736 #[error("Fatal error: {0}")]
738 Fatal(String),
739
740 #[error("Rate limited until {0:?}")]
741 RateLimited(Option<SystemTime>),
742
743 #[error("Server unreachable: {0}")]
744 ServerUnreachable(String),
745}
746
747impl RPCError {
748 fn from_parse_error(err: serde_json::Error, body: &str) -> Self {
758 if body.contains("version is older than") || body.contains("Could not find Block") {
759 RPCError::StaleBlock(body.to_string())
760 } else if body.starts_with("Unknown extractor:") {
761 RPCError::UnknownExtractor(body.to_string())
762 } else {
763 RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
764 }
765 }
766}
767
768#[cfg_attr(test, automock)]
769#[async_trait]
770pub trait RPCClient: Send + Sync {
771 fn compression(&self) -> bool;
773
774 async fn get_contract_state(
778 &self,
779 params: ContractStateParams,
780 ) -> Result<Page<Vec<Account>>, RPCError>;
781
782 async fn get_contract_state_paginated(
786 &self,
787 params: ContractStatePaginatedParams,
788 ) -> Result<Vec<Account>, RPCError> {
789 let semaphore = Arc::new(Semaphore::new(params.concurrency));
790
791 let mut sorted_ids = params.contract_ids;
793 sorted_ids.sort();
794
795 let chunk_size = params
796 .chunk_size
797 .unwrap_or(StateRequestBody::effective_max_page_size(self.compression()) as usize);
798
799 let mut tasks = Vec::new();
800 for chunk in sorted_ids.chunks(chunk_size) {
801 let sem = semaphore.clone();
802 let base_params =
803 ContractStateParams::new(params.chain, params.protocol_system.as_str())
804 .with_contract_ids(chunk.to_vec())
805 .with_version(params.version.clone())
806 .with_pagination(0, chunk_size as i64);
807 tasks.push(async move {
808 let _permit = sem
809 .acquire()
810 .await
811 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
812 self.get_contract_state(base_params)
813 .await
814 });
815 }
816
817 let pages = try_join_all(tasks).await?;
818
819 let accounts = pages
820 .into_iter()
821 .flat_map(|p| p.into_iter())
822 .collect();
823
824 Ok(accounts)
825 }
826
827 async fn get_protocol_components(
831 &self,
832 params: ProtocolComponentsParams,
833 ) -> Result<Page<Vec<ProtocolComponent>>, RPCError>;
834
835 async fn get_protocol_components_paginated(
839 &self,
840 params: ProtocolComponentsPaginatedParams,
841 ) -> Result<Vec<ProtocolComponent>, RPCError> {
842 let chain = params.chain;
843 let protocol_system = params.protocol_system;
844 let component_ids = params.component_ids;
845 let tvl_gt = params.tvl_gt;
846 let chunk_size = params.chunk_size;
847 let concurrency = params.concurrency;
848
849 let semaphore = Arc::new(Semaphore::new(concurrency));
850
851 let chunk_size = chunk_size.unwrap_or(
852 ProtocolComponentsRequestBody::effective_max_page_size(self.compression()) as usize,
853 );
854
855 match component_ids {
858 Some(ids) => {
859 let tasks: Vec<_> =
860 ids.chunks(chunk_size)
861 .enumerate()
862 .map(|(index, chunk)| {
863 let sem = semaphore.clone();
864 let mut base =
865 ProtocolComponentsParams::new(chain, protocol_system.as_str())
866 .with_component_ids(chunk.to_vec())
867 .with_pagination(index as i64, chunk_size as i64);
868 if let Some(tvl) = tvl_gt {
869 base = base.with_tvl_gt(tvl);
870 }
871 async move {
872 let _permit = sem.acquire().await.map_err(|_| {
873 RPCError::Fatal("Semaphore dropped".to_string())
874 })?;
875 self.get_protocol_components(base).await
876 }
877 })
878 .collect();
879
880 try_join_all(tasks)
881 .await
882 .map(|pages| pages.into_iter().flatten().collect())
883 }
884 None => {
885 let mut base_params =
888 ProtocolComponentsParams::new(chain, protocol_system.as_str())
889 .with_pagination(0, chunk_size as i64);
890 if let Some(tvl) = tvl_gt {
891 base_params = base_params.with_tvl_gt(tvl);
892 }
893
894 let first_page = self
895 .get_protocol_components(base_params)
896 .await?;
897
898 let total_items = first_page.total();
899 let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
900
901 let mut all: Vec<ProtocolComponent> = first_page.into_data();
902
903 let mut page = 1;
904 while page < total_pages {
905 let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
906
907 let tasks: Vec<_> = (0..requests_in_this_iteration)
908 .map(|iter| {
909 let sem = semaphore.clone();
910 let mut p =
911 ProtocolComponentsParams::new(chain, protocol_system.as_str())
912 .with_pagination(page + iter, chunk_size as i64);
913 if let Some(tvl) = tvl_gt {
914 p = p.with_tvl_gt(tvl);
915 }
916 async move {
917 let _permit = sem.acquire().await.map_err(|_| {
918 RPCError::Fatal("Semaphore dropped".to_string())
919 })?;
920 self.get_protocol_components(p).await
921 }
922 })
923 .collect();
924
925 let responses = try_join_all(tasks).await?;
926
927 for resp in responses {
928 all.extend(resp);
929 }
930
931 page += requests_in_this_iteration;
932 }
933 Ok(all)
934 }
935 }
936 }
937
938 async fn get_protocol_states(
942 &self,
943 params: ProtocolStatesParams,
944 ) -> Result<Page<Vec<ProtocolComponentState>>, RPCError>;
945
946 async fn get_protocol_states_paginated(
950 &self,
951 params: ProtocolStatesPaginatedParams,
952 ) -> Result<Vec<ProtocolComponentState>, RPCError> {
953 let semaphore = Arc::new(Semaphore::new(params.concurrency));
954
955 let chunk_size =
956 params
957 .chunk_size
958 .unwrap_or(
959 ProtocolStateRequestBody::effective_max_page_size(self.compression()) as usize
960 );
961
962 let tasks: Vec<_> = params
963 .protocol_ids
964 .chunks(chunk_size)
965 .map(|c| {
966 let sem = semaphore.clone();
967 let p = ProtocolStatesParams::new(params.chain, params.protocol_system.as_str())
968 .with_protocol_ids(c.to_vec())
969 .with_include_balances(params.include_balances)
970 .with_version(params.version.clone())
971 .with_pagination(0, chunk_size as i64);
972 async move {
973 let _permit = sem
974 .acquire()
975 .await
976 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
977 self.get_protocol_states(p).await
978 }
979 })
980 .collect();
981
982 try_join_all(tasks)
983 .await
984 .map(|pages| pages.into_iter().flatten().collect())
985 }
986
987 async fn get_tokens(&self, params: TokensParams) -> Result<Page<Vec<Token>>, RPCError>;
991
992 async fn get_all_tokens(&self, params: AllTokensParams) -> Result<Vec<Token>, RPCError> {
996 let chunk_size = params
997 .chunk_size
998 .unwrap_or(TokensRequestBody::effective_max_page_size(self.compression()) as usize);
999
1000 let semaphore = Arc::new(Semaphore::new(params.concurrency));
1001
1002 let page_size: i64 = chunk_size.try_into().map_err(|_| {
1003 RPCError::FormatRequest("Failed to convert chunk_size into i64".to_string())
1004 })?;
1005
1006 let mut base_params = TokensParams::new(params.chain).with_pagination(0, page_size);
1007 if let Some(q) = params.min_quality {
1008 base_params = base_params.with_min_quality(q);
1009 }
1010 if let Some(d) = params.traded_n_days_ago {
1011 base_params = base_params.with_traded_n_days_ago(d);
1012 }
1013
1014 let first_page = self.get_tokens(base_params).await?;
1015 let total_pages = (first_page.total() as f64 / chunk_size as f64).ceil() as i64;
1016
1017 let mut all_tokens: Vec<Token> = first_page.into_data();
1018
1019 if total_pages <= 1 {
1020 return Ok(all_tokens);
1021 }
1022
1023 let tasks: Vec<_> = (1..total_pages)
1024 .map(|page| {
1025 let sem = semaphore.clone();
1026 let mut p = TokensParams::new(params.chain).with_pagination(page, page_size);
1027 if let Some(q) = params.min_quality {
1028 p = p.with_min_quality(q);
1029 }
1030 if let Some(d) = params.traded_n_days_ago {
1031 p = p.with_traded_n_days_ago(d);
1032 }
1033 async move {
1034 let _permit = sem
1035 .acquire()
1036 .await
1037 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
1038 self.get_tokens(p).await
1039 }
1040 })
1041 .collect();
1042
1043 let pages = try_join_all(tasks).await?;
1044 for page in pages {
1045 all_tokens.extend(page);
1046 }
1047
1048 Ok(all_tokens)
1049 }
1050
1051 async fn get_protocol_systems(
1053 &self,
1054 params: ProtocolSystemsParams,
1055 ) -> Result<Page<ProtocolSystems>, RPCError>;
1056
1057 async fn get_component_tvl(
1061 &self,
1062 params: ComponentTvlParams,
1063 ) -> Result<Page<HashMap<String, f64>>, RPCError>;
1064
1065 async fn get_component_tvl_paginated(
1069 &self,
1070 params: ComponentTvlPaginatedParams,
1071 ) -> Result<HashMap<String, f64>, RPCError> {
1072 let semaphore = Arc::new(Semaphore::new(params.concurrency));
1073
1074 let chunk_size =
1075 params
1076 .chunk_size
1077 .unwrap_or(
1078 ComponentTvlRequestBody::effective_max_page_size(self.compression()) as usize
1079 );
1080
1081 match params.component_ids {
1082 Some(ids) => {
1083 let tasks: Vec<_> =
1084 ids.chunks(chunk_size)
1085 .enumerate()
1086 .map(|(index, chunk)| {
1087 let sem = semaphore.clone();
1088 let mut p = ComponentTvlParams::new(params.chain)
1089 .with_component_ids(chunk.to_vec())
1090 .with_pagination(index as i64, chunk_size as i64);
1091 if let Some(ref ps) = params.protocol_system {
1092 p = p.with_protocol_system(ps.as_str());
1093 }
1094 async move {
1095 let _permit = sem.acquire().await.map_err(|_| {
1096 RPCError::Fatal("Semaphore dropped".to_string())
1097 })?;
1098 self.get_component_tvl(p).await
1099 }
1100 })
1101 .collect();
1102
1103 let pages = try_join_all(tasks).await?;
1104
1105 let mut merged_tvl = HashMap::new();
1106 for page in pages {
1107 for (key, value) in page {
1108 *merged_tvl.entry(key).or_insert(0.0) = value;
1109 }
1110 }
1111
1112 Ok(merged_tvl)
1113 }
1114 None => {
1115 let mut base =
1116 ComponentTvlParams::new(params.chain).with_pagination(0, chunk_size as i64);
1117 if let Some(ref ps) = params.protocol_system {
1118 base = base.with_protocol_system(ps.as_str());
1119 }
1120
1121 let first_page = self.get_component_tvl(base).await?;
1122 let total_items = first_page.total();
1123 let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
1124
1125 let mut merged_tvl: HashMap<String, f64> = first_page.into_data();
1126
1127 let mut page = 1;
1128 while page < total_pages {
1129 let requests_in_this_iteration =
1130 (total_pages - page).min(params.concurrency as i64);
1131
1132 let tasks: Vec<_> = (0..requests_in_this_iteration)
1133 .map(|i| {
1134 let sem = semaphore.clone();
1135 let mut p = ComponentTvlParams::new(params.chain)
1136 .with_pagination(page + i, chunk_size as i64);
1137 if let Some(ref ps) = params.protocol_system {
1138 p = p.with_protocol_system(ps.as_str());
1139 }
1140 async move {
1141 let _permit = sem.acquire().await.map_err(|_| {
1142 RPCError::Fatal("Semaphore dropped".to_string())
1143 })?;
1144 self.get_component_tvl(p).await
1145 }
1146 })
1147 .collect();
1148
1149 let responses = try_join_all(tasks).await?;
1150
1151 for resp in responses {
1152 for (key, value) in resp {
1153 *merged_tvl.entry(key).or_insert(0.0) = value;
1154 }
1155 }
1156
1157 page += requests_in_this_iteration;
1158 }
1159
1160 Ok(merged_tvl)
1161 }
1162 }
1163 }
1164
1165 async fn get_traced_entry_points(
1169 &self,
1170 params: TracedEntryPointsParams,
1171 ) -> Result<Page<TracedEntryPoints>, RPCError>;
1172
1173 async fn get_traced_entry_points_paginated(
1177 &self,
1178 params: TracedEntryPointsPaginatedParams,
1179 ) -> Result<TracedEntryPoints, RPCError> {
1180 let chain = params.chain;
1181 let protocol_system = params.protocol_system;
1182 let component_ids = params.component_ids;
1183 let chunk_size = params.chunk_size;
1184 let concurrency = params.concurrency;
1185
1186 let semaphore = Arc::new(Semaphore::new(concurrency));
1187
1188 let chunk_size = chunk_size.unwrap_or(
1189 TracedEntryPointRequestBody::effective_max_page_size(self.compression()) as usize,
1190 );
1191
1192 let tasks: Vec<_> = component_ids
1193 .chunks(chunk_size)
1194 .map(|c| {
1195 let sem = semaphore.clone();
1196 let params = TracedEntryPointsParams::new(chain, protocol_system.as_str())
1197 .with_component_ids(c.to_vec())
1198 .with_pagination(0, chunk_size as i64);
1199 async move {
1200 let _permit = sem
1201 .acquire()
1202 .await
1203 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
1204 self.get_traced_entry_points(params)
1205 .await
1206 }
1207 })
1208 .collect();
1209
1210 try_join_all(tasks)
1211 .await
1212 .map(|pages| pages.into_iter().flatten().collect())
1213 }
1214
1215 async fn get_snapshots<'a>(
1216 &self,
1217 request: &SnapshotParameters<'a>,
1218 chunk_size: Option<usize>,
1219 concurrency: usize,
1220 ) -> Result<Snapshot, RPCError>;
1221}
1222
1223#[derive(Debug, Clone)]
1225pub struct HttpRPCClientOptions {
1226 pub auth_key: Option<String>,
1228 pub compression: bool,
1231}
1232
1233impl Default for HttpRPCClientOptions {
1234 fn default() -> Self {
1235 Self::new()
1236 }
1237}
1238
1239impl HttpRPCClientOptions {
1240 pub fn new() -> Self {
1242 Self { auth_key: None, compression: true }
1243 }
1244
1245 pub fn with_auth_key(mut self, auth_key: Option<String>) -> Self {
1247 self.auth_key = auth_key;
1248 self
1249 }
1250
1251 pub fn with_compression(mut self, compression: bool) -> Self {
1253 self.compression = compression;
1254 self
1255 }
1256}
1257
1258#[derive(Debug, Clone)]
1259pub struct HttpRPCClient {
1260 http_client: Client,
1261 url: Url,
1262 retry_after: Arc<RwLock<Option<SystemTime>>>,
1263 backoff_policy: ExponentialBackoff,
1264 server_restart_duration: Duration,
1265 compression: bool,
1266}
1267
1268impl HttpRPCClient {
1269 pub fn new(base_uri: &str, options: HttpRPCClientOptions) -> Result<Self, RPCError> {
1270 let uri = base_uri
1271 .parse::<Url>()
1272 .map_err(|e| RPCError::UrlParsing(base_uri.to_string(), e.to_string()))?;
1273
1274 let mut headers = header::HeaderMap::new();
1276 headers.insert(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"));
1277 let user_agent = format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION"));
1278 headers.insert(
1279 header::USER_AGENT,
1280 header::HeaderValue::from_str(&user_agent)
1281 .map_err(|e| RPCError::FormatRequest(format!("Invalid user agent format: {e}")))?,
1282 );
1283
1284 if let Some(key) = options.auth_key.as_deref() {
1286 let mut auth_value = header::HeaderValue::from_str(key).map_err(|e| {
1287 RPCError::FormatRequest(format!("Invalid authorization key format: {e}"))
1288 })?;
1289 auth_value.set_sensitive(true);
1290 headers.insert(header::AUTHORIZATION, auth_value);
1291 }
1292
1293 let mut client_builder = ClientBuilder::new()
1294 .default_headers(headers)
1295 .http2_prior_knowledge();
1296
1297 if !options.compression {
1299 client_builder = client_builder.no_zstd();
1300 }
1301
1302 let client = client_builder
1303 .build()
1304 .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
1305
1306 Ok(Self {
1307 http_client: client,
1308 url: uri,
1309 retry_after: Arc::new(RwLock::new(None)),
1310 backoff_policy: ExponentialBackoffBuilder::new()
1311 .with_initial_interval(Duration::from_millis(250))
1312 .with_multiplier(1.75)
1314 .with_max_interval(Duration::from_secs(30))
1316 .with_max_elapsed_time(Some(Duration::from_secs(125)))
1318 .build(),
1319 server_restart_duration: Duration::from_secs(120),
1320 compression: options.compression,
1321 })
1322 }
1323
1324 #[cfg(test)]
1325 pub fn with_test_backoff_policy(mut self) -> Self {
1326 self.backoff_policy = ExponentialBackoffBuilder::new()
1328 .with_initial_interval(Duration::from_millis(1))
1329 .with_multiplier(1.1)
1330 .with_max_interval(Duration::from_millis(5))
1331 .with_max_elapsed_time(Some(Duration::from_millis(50)))
1332 .build();
1333 self.server_restart_duration = Duration::from_millis(50);
1334 self
1335 }
1336
1337 async fn error_for_response(
1343 &self,
1344 response: reqwest::Response,
1345 ) -> Result<reqwest::Response, RPCError> {
1346 match response.status() {
1347 StatusCode::TOO_MANY_REQUESTS => {
1348 let retry_after_raw = response
1349 .headers()
1350 .get(reqwest::header::RETRY_AFTER)
1351 .and_then(|h| h.to_str().ok())
1352 .and_then(parse_retry_value);
1353
1354 let reason = response
1355 .text()
1356 .await
1357 .unwrap_or_default();
1358 warn!(reason, retry_after = ?retry_after_raw, "Rate limited by server");
1359
1360 Err(RPCError::RateLimited(retry_after_raw))
1361 }
1362 StatusCode::BAD_GATEWAY |
1363 StatusCode::SERVICE_UNAVAILABLE |
1364 StatusCode::GATEWAY_TIMEOUT => Err(RPCError::ServerUnreachable(
1365 response
1366 .text()
1367 .await
1368 .unwrap_or_else(|_| "Server Unreachable".to_string()),
1369 )),
1370 _ => Ok(response),
1371 }
1372 }
1373
1374 async fn handle_error_for_backoff(&self, e: RPCError) -> backoff::Error<RPCError> {
1380 match e {
1381 RPCError::ServerUnreachable(_) => {
1382 backoff::Error::retry_after(e, self.server_restart_duration)
1383 }
1384 RPCError::RateLimited(Some(until)) => {
1385 let mut retry_after_guard = self.retry_after.write().await;
1386 *retry_after_guard = Some(
1387 retry_after_guard
1388 .unwrap_or(until)
1389 .max(until),
1390 );
1391
1392 if let Ok(duration) = until.duration_since(SystemTime::now()) {
1393 backoff::Error::retry_after(e, duration)
1394 } else {
1395 e.into()
1396 }
1397 }
1398 RPCError::RateLimited(None) => e.into(),
1399 _ => backoff::Error::permanent(e),
1400 }
1401 }
1402
1403 async fn wait_until_retry_after(&self) {
1408 if let Some(&until) = self.retry_after.read().await.as_ref() {
1409 let now = SystemTime::now();
1410 if until > now {
1411 if let Ok(duration) = until.duration_since(now) {
1412 sleep(duration).await
1413 }
1414 }
1415 }
1416 }
1417
1418 async fn make_post_request<T: Serialize + ?Sized>(
1423 &self,
1424 request: &T,
1425 uri: &String,
1426 ) -> Result<Response, RPCError> {
1427 self.wait_until_retry_after().await;
1428 let response = backoff::future::retry(self.backoff_policy.clone(), || async {
1429 let server_response = self
1430 .http_client
1431 .post(uri)
1432 .json(request)
1433 .send()
1434 .await
1435 .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
1436
1437 match self
1438 .error_for_response(server_response)
1439 .await
1440 {
1441 Ok(response) => Ok(response),
1442 Err(e) => Err(self.handle_error_for_backoff(e).await),
1443 }
1444 })
1445 .await?;
1446 Ok(response)
1447 }
1448}
1449
1450fn parse_retry_value(val: &str) -> Option<SystemTime> {
1451 if let Ok(secs) = val.parse::<u64>() {
1452 return Some(SystemTime::now() + Duration::from_secs(secs));
1453 }
1454 if let Ok(date) = OffsetDateTime::parse(val, &Rfc2822) {
1455 return Some(date.into());
1456 }
1457 None
1458}
1459
1460#[async_trait]
1461impl RPCClient for HttpRPCClient {
1462 fn compression(&self) -> bool {
1463 self.compression
1464 }
1465
1466 #[instrument(skip(self))]
1467 async fn get_contract_state(
1468 &self,
1469 params: ContractStateParams,
1470 ) -> Result<Page<Vec<Account>>, RPCError> {
1471 if params
1472 .contract_ids
1473 .as_ref()
1474 .is_none_or(|ids| ids.is_empty())
1475 {
1476 warn!("No contract ids specified in request.");
1477 }
1478
1479 let request = StateRequestBody {
1480 contract_ids: params.contract_ids,
1481 protocol_system: params.protocol_system,
1482 chain: params.chain.into(),
1483 version: params.version,
1484 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1485 };
1486
1487 let uri = format!(
1488 "{}/{}/contract_state",
1489 self.url
1490 .to_string()
1491 .trim_end_matches('/'),
1492 TYCHO_SERVER_VERSION
1493 );
1494 debug!(%uri, "Sending contract_state request to Tycho server");
1495 trace!(?request, "Sending request to Tycho server");
1496 let response = self
1497 .make_post_request(&request, &uri)
1498 .await?;
1499 trace!(?response, "Received response from Tycho server");
1500
1501 let body = response
1502 .text()
1503 .await
1504 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1505 if body.is_empty() {
1506 return Ok(Page::new(vec![], 0, 0, 0));
1508 }
1509
1510 let dto_response = serde_json::from_str::<StateRequestResponse>(&body)
1511 .map_err(|err| RPCError::from_parse_error(err, &body))?;
1512 trace!(?dto_response, "Received contract_state response from Tycho server");
1513
1514 let data: Vec<Account> = dto_response
1515 .accounts
1516 .into_iter()
1517 .map(Account::from)
1518 .collect();
1519 Ok(Page::new(
1520 data,
1521 dto_response.pagination.total,
1522 dto_response.pagination.page,
1523 dto_response.pagination.page_size,
1524 ))
1525 }
1526
1527 async fn get_protocol_components(
1528 &self,
1529 params: ProtocolComponentsParams,
1530 ) -> Result<Page<Vec<ProtocolComponent>>, RPCError> {
1531 let request = ProtocolComponentsRequestBody {
1532 protocol_system: params.protocol_system,
1533 component_ids: params.component_ids,
1534 tvl_gt: params.tvl_gt,
1535 chain: params.chain.into(),
1536 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1537 };
1538
1539 let uri = format!(
1540 "{}/{}/protocol_components",
1541 self.url
1542 .to_string()
1543 .trim_end_matches('/'),
1544 TYCHO_SERVER_VERSION,
1545 );
1546 debug!(%uri, "Sending protocol_components request to Tycho server");
1547 trace!(?request, "Sending request to Tycho server");
1548
1549 let response = self
1550 .make_post_request(&request, &uri)
1551 .await?;
1552
1553 trace!(?response, "Received response from Tycho server");
1554
1555 let body = response
1556 .text()
1557 .await
1558 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1559 let dto_response = serde_json::from_str::<ProtocolComponentRequestResponse>(&body)
1560 .map_err(|err| RPCError::from_parse_error(err, &body))?;
1561 trace!(?dto_response, "Received protocol_components response from Tycho server");
1562
1563 let data: Vec<ProtocolComponent> = dto_response
1564 .protocol_components
1565 .into_iter()
1566 .map(ProtocolComponent::from)
1567 .collect();
1568 Ok(Page::new(
1569 data,
1570 dto_response.pagination.total,
1571 dto_response.pagination.page,
1572 dto_response.pagination.page_size,
1573 ))
1574 }
1575
1576 async fn get_protocol_states(
1577 &self,
1578 params: ProtocolStatesParams,
1579 ) -> Result<Page<Vec<ProtocolComponentState>>, RPCError> {
1580 if params
1581 .protocol_ids
1582 .as_ref()
1583 .is_none_or(|ids| ids.is_empty())
1584 {
1585 warn!("No protocol ids specified in request.");
1586 }
1587
1588 let request = ProtocolStateRequestBody {
1589 protocol_ids: params.protocol_ids,
1590 protocol_system: params.protocol_system,
1591 chain: params.chain.into(),
1592 include_balances: params.include_balances,
1593 version: params.version,
1594 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1595 };
1596
1597 let uri = format!(
1598 "{}/{}/protocol_state",
1599 self.url
1600 .to_string()
1601 .trim_end_matches('/'),
1602 TYCHO_SERVER_VERSION
1603 );
1604 debug!(%uri, "Sending protocol_states request to Tycho server");
1605 trace!(?request, "Sending request to Tycho server");
1606
1607 let response = self
1608 .make_post_request(&request, &uri)
1609 .await?;
1610 trace!(?response, "Received response from Tycho server");
1611
1612 let body = response
1613 .text()
1614 .await
1615 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1616
1617 if body.is_empty() {
1618 return Ok(Page::new(vec![], 0, 0, 0));
1620 }
1621
1622 let dto_response = serde_json::from_str::<ProtocolStateRequestResponse>(&body)
1623 .map_err(|err| RPCError::from_parse_error(err, &body))?;
1624 trace!(?dto_response, "Received protocol_states response from Tycho server");
1625
1626 let data: Vec<ProtocolComponentState> = dto_response
1627 .states
1628 .into_iter()
1629 .map(ProtocolComponentState::from)
1630 .collect();
1631 Ok(Page::new(
1632 data,
1633 dto_response.pagination.total,
1634 dto_response.pagination.page,
1635 dto_response.pagination.page_size,
1636 ))
1637 }
1638
1639 async fn get_tokens(&self, params: TokensParams) -> Result<Page<Vec<Token>>, RPCError> {
1640 let request = TokensRequestBody {
1641 token_addresses: None,
1642 min_quality: params.min_quality,
1643 traded_n_days_ago: params.traded_n_days_ago,
1644 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1645 chain: params.chain.into(),
1646 };
1647
1648 let uri = format!(
1649 "{}/{}/tokens",
1650 self.url
1651 .to_string()
1652 .trim_end_matches('/'),
1653 TYCHO_SERVER_VERSION
1654 );
1655 debug!(%uri, "Sending tokens request to Tycho server");
1656
1657 let response = self
1658 .make_post_request(&request, &uri)
1659 .await?;
1660
1661 let body = response
1662 .text()
1663 .await
1664 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1665 let dto_response = serde_json::from_str::<TokensRequestResponse>(&body)
1666 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1667
1668 let data: Vec<Token> = dto_response
1669 .tokens
1670 .into_iter()
1671 .map(Token::from)
1672 .collect();
1673 Ok(Page::new(
1674 data,
1675 dto_response.pagination.total,
1676 dto_response.pagination.page,
1677 dto_response.pagination.page_size,
1678 ))
1679 }
1680
1681 async fn get_protocol_systems(
1682 &self,
1683 params: ProtocolSystemsParams,
1684 ) -> Result<Page<ProtocolSystems>, RPCError> {
1685 let request = ProtocolSystemsRequestBody {
1686 chain: params.chain.into(),
1687 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1688 };
1689
1690 let uri = format!(
1691 "{}/{}/protocol_systems",
1692 self.url
1693 .to_string()
1694 .trim_end_matches('/'),
1695 TYCHO_SERVER_VERSION
1696 );
1697 debug!(%uri, "Sending protocol_systems request to Tycho server");
1698 trace!(?request, "Sending request to Tycho server");
1699 let response = self
1700 .make_post_request(&request, &uri)
1701 .await?;
1702 trace!(?response, "Received response from Tycho server");
1703 let body = response
1704 .text()
1705 .await
1706 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1707 let dto = serde_json::from_str::<ProtocolSystemsRequestResponse>(&body)
1708 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1709 trace!(?dto, "Received protocol_systems response from Tycho server");
1710 Ok(Page::new(
1711 ProtocolSystems::new(dto.protocol_systems, dto.dci_protocols),
1712 dto.pagination.total,
1713 dto.pagination.page,
1714 dto.pagination.page_size,
1715 ))
1716 }
1717
1718 async fn get_component_tvl(
1719 &self,
1720 params: ComponentTvlParams,
1721 ) -> Result<Page<HashMap<String, f64>>, RPCError> {
1722 let request = ComponentTvlRequestBody {
1723 chain: params.chain.into(),
1724 protocol_system: params.protocol_system,
1725 component_ids: params.component_ids,
1726 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1727 };
1728
1729 let uri = format!(
1730 "{}/{}/component_tvl",
1731 self.url
1732 .to_string()
1733 .trim_end_matches('/'),
1734 TYCHO_SERVER_VERSION
1735 );
1736 debug!(%uri, "Sending get_component_tvl request to Tycho server");
1737 trace!(?request, "Sending request to Tycho server");
1738 let response = self
1739 .make_post_request(&request, &uri)
1740 .await?;
1741 trace!(?response, "Received response from Tycho server");
1742 let body = response
1743 .text()
1744 .await
1745 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1746 let dto_response =
1747 serde_json::from_str::<ComponentTvlRequestResponse>(&body).map_err(|err| {
1748 error!("Failed to parse component_tvl response: {:?}", &body);
1749 RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1750 })?;
1751 trace!(?dto_response, "Received component_tvl response from Tycho server");
1752 Ok(Page::new(
1753 dto_response.tvl,
1754 dto_response.pagination.total,
1755 dto_response.pagination.page,
1756 dto_response.pagination.page_size,
1757 ))
1758 }
1759
1760 async fn get_traced_entry_points(
1761 &self,
1762 params: TracedEntryPointsParams,
1763 ) -> Result<Page<TracedEntryPoints>, RPCError> {
1764 let request = TracedEntryPointRequestBody {
1765 chain: params.chain.into(),
1766 protocol_system: params.protocol_system,
1767 component_ids: params.component_ids,
1768 pagination: PaginationParams { page: params.page, page_size: params.page_size },
1769 };
1770
1771 let uri = format!(
1772 "{}/{TYCHO_SERVER_VERSION}/traced_entry_points",
1773 self.url
1774 .to_string()
1775 .trim_end_matches('/')
1776 );
1777 debug!(%uri, "Sending traced_entry_points request to Tycho server");
1778 trace!(?request, "Sending request to Tycho server");
1779
1780 let response = self
1781 .make_post_request(&request, &uri)
1782 .await?;
1783
1784 trace!(?response, "Received response from Tycho server");
1785
1786 let body = response
1787 .text()
1788 .await
1789 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1790 let dto_response =
1791 serde_json::from_str::<TracedEntryPointRequestResponse>(&body).map_err(|err| {
1792 error!("Failed to parse traced_entry_points response: {:?}", &body);
1793 RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1794 })?;
1795 trace!(?dto_response, "Received traced_entry_points response from Tycho server");
1796 let data: TracedEntryPoints = dto_response
1797 .traced_entry_points
1798 .into_iter()
1799 .map(|(k, v)| {
1800 (
1801 k,
1802 v.into_iter()
1803 .map(|(ep, tr)| {
1804 (EntryPointWithTracingParams::from(ep), TracingResult::from(tr))
1805 })
1806 .collect(),
1807 )
1808 })
1809 .collect();
1810 Ok(Page::new(
1811 data,
1812 dto_response.pagination.total,
1813 dto_response.pagination.page,
1814 dto_response.pagination.page_size,
1815 ))
1816 }
1817
1818 async fn get_snapshots<'a>(
1819 &self,
1820 request: &SnapshotParameters<'a>,
1821 chunk_size: Option<usize>,
1822 concurrency: usize,
1823 ) -> Result<Snapshot, RPCError> {
1824 let component_ids: Vec<_> = request
1825 .components
1826 .keys()
1827 .cloned()
1828 .collect();
1829
1830 let component_tvl = if request.include_tvl && !component_ids.is_empty() {
1831 self.get_component_tvl_paginated(
1832 ComponentTvlPaginatedParams::new(request.chain, concurrency)
1833 .with_component_ids(component_ids.clone()),
1834 )
1835 .await?
1836 } else {
1837 HashMap::new()
1838 };
1839
1840 let version = VersionParam::at_block(request.chain.into(), request.block_number);
1841
1842 let mut protocol_states = if !component_ids.is_empty() {
1843 self.get_protocol_states_paginated(
1844 ProtocolStatesPaginatedParams::new(
1845 request.chain,
1846 request.protocol_system,
1847 concurrency,
1848 )
1849 .with_protocol_ids(component_ids.clone())
1850 .with_include_balances(request.include_balances)
1851 .with_version(version.clone()),
1852 )
1853 .await?
1854 .into_iter()
1855 .map(|state| (state.component_id.clone(), state))
1856 .collect()
1857 } else {
1858 HashMap::new()
1859 };
1860
1861 let states = request
1863 .components
1864 .values()
1865 .filter_map(|component| {
1866 if let Some(state) = protocol_states.remove(&component.id) {
1867 Some((
1868 component.id.clone(),
1869 ComponentWithState {
1870 state,
1871 component: component.clone(),
1872 component_tvl: component_tvl
1873 .get(&component.id)
1874 .cloned(),
1875 entrypoints: request
1876 .entrypoints
1877 .as_ref()
1878 .and_then(|map| map.get(&component.id))
1879 .cloned()
1880 .unwrap_or_default(),
1881 },
1882 ))
1883 } else if component_ids.contains(&component.id) {
1884 let component_id = &component.id;
1886 error!(?component_id, "Missing state for native component!");
1887 None
1888 } else {
1889 None
1890 }
1891 })
1892 .collect();
1893
1894 let vm_storage = if !request.contract_ids.is_empty() {
1895 let mut cp_params = ContractStatePaginatedParams::new(
1896 request.chain,
1897 request.protocol_system,
1898 concurrency,
1899 )
1900 .with_contract_ids(request.contract_ids.to_vec())
1901 .with_version(version.clone());
1902 if let Some(cs) = chunk_size {
1903 cp_params = cp_params.with_chunk_size(cs);
1904 }
1905 let contract_states = self
1906 .get_contract_state_paginated(cp_params)
1907 .await?
1908 .into_iter()
1909 .map(|acc| (acc.address.clone(), acc))
1910 .collect::<HashMap<_, _>>();
1911
1912 trace!(states=?&contract_states, "Retrieved ContractState");
1913
1914 let contract_address_to_components = request
1915 .components
1916 .iter()
1917 .filter_map(|(id, comp)| {
1918 if component_ids.contains(id) {
1919 Some(
1920 comp.contract_addresses
1921 .iter()
1922 .map(|address| (address.clone(), comp.id.clone())),
1923 )
1924 } else {
1925 None
1926 }
1927 })
1928 .flatten()
1929 .fold(HashMap::<Bytes, Vec<String>>::new(), |mut acc, (addr, c_id)| {
1930 acc.entry(addr).or_default().push(c_id);
1931 acc
1932 });
1933
1934 request
1935 .contract_ids
1936 .iter()
1937 .filter_map(|address| {
1938 if let Some(state) = contract_states.get(address) {
1939 Some((address.clone(), state.clone()))
1940 } else if let Some(ids) = contract_address_to_components.get(address) {
1941 error!(
1943 ?address,
1944 ?ids,
1945 "Component with lacking contract storage encountered!"
1946 );
1947 None
1948 } else {
1949 None
1950 }
1951 })
1952 .collect()
1953 } else {
1954 HashMap::new()
1955 };
1956
1957 Ok(Snapshot { states, vm_storage })
1958 }
1959}
1960
1961#[cfg(test)]
1962mod tests {
1963 use std::{
1964 collections::{HashMap, HashSet},
1965 str::FromStr,
1966 };
1967
1968 use mockito::Server;
1969 use rstest::rstest;
1970 use tycho_common::models::blockchain::AddressStorageLocation;
1971
1972 use super::*;
1973
1974 impl MockRPCClient {
1977 #[allow(clippy::too_many_arguments)]
1978 async fn test_get_protocol_states_paginated<T>(
1979 &self,
1980 chain: Chain,
1981 ids: &[T],
1982 protocol_system: &str,
1983 include_balances: bool,
1984 block_number: Option<u64>,
1985 chunk_size: usize,
1986 _concurrency: usize,
1987 ) -> Vec<(Chain, Vec<String>, String, bool, Option<u64>, PaginationParams)>
1988 where
1989 T: AsRef<str> + Clone + Send + Sync + 'static,
1990 {
1991 ids.chunks(chunk_size)
1992 .map(|chunk| {
1993 (
1994 chain,
1995 chunk
1996 .iter()
1997 .map(|id| id.as_ref().to_string())
1998 .collect(),
1999 protocol_system.to_string(),
2000 include_balances,
2001 block_number,
2002 PaginationParams { page: 0, page_size: chunk_size as i64 },
2003 )
2004 })
2005 .collect()
2006 }
2007 }
2008
2009 const GET_CONTRACT_STATE_RESP: &str = r#"
2010 {
2011 "accounts": [
2012 {
2013 "chain": "ethereum",
2014 "address": "0x0000000000000000000000000000000000000000",
2015 "title": "",
2016 "slots": {},
2017 "native_balance": "0x01f4",
2018 "token_balances": {},
2019 "code": "0x00",
2020 "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
2021 "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2022 "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2023 "creation_tx": null
2024 }
2025 ],
2026 "pagination": {
2027 "page": 0,
2028 "page_size": 20,
2029 "total": 10
2030 }
2031 }
2032 "#;
2033
2034 #[rstest]
2035 #[case::string_input(vec![
2036 "id1".to_string(),
2037 "id2".to_string()
2038 ])]
2039 #[tokio::test]
2040 async fn test_get_protocol_states_paginated<T>(#[case] ids: Vec<T>)
2041 where
2042 T: AsRef<str> + Clone + Send + Sync + 'static,
2043 {
2044 let mock_client = MockRPCClient::new();
2045
2046 let request_args = mock_client
2047 .test_get_protocol_states_paginated(
2048 Chain::Ethereum,
2049 &ids,
2050 "test_system",
2051 true,
2052 None,
2053 2,
2054 2,
2055 )
2056 .await;
2057
2058 assert_eq!(request_args.len(), 1);
2060 assert_eq!(request_args[0].1.len(), 2);
2061 }
2062
2063 #[tokio::test]
2064 async fn test_get_contract_state() {
2065 let mut server = Server::new_async().await;
2066 let server_resp = GET_CONTRACT_STATE_RESP;
2067 serde_json::from_str::<StateRequestResponse>(server_resp).expect("deserialize");
2069
2070 let mocked_server = server
2071 .mock("POST", "/v1/contract_state")
2072 .expect(1)
2073 .with_body(server_resp)
2074 .create_async()
2075 .await;
2076
2077 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2078 .expect("create client");
2079
2080 let accounts = client
2081 .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
2082 .await
2083 .expect("get state");
2084
2085 mocked_server.assert();
2086 assert_eq!(accounts.data().len(), 1);
2087 assert_eq!(accounts.data()[0].slots, HashMap::new());
2088 assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
2089 assert_eq!(accounts.data()[0].code, [0].to_vec());
2090 assert_eq!(
2091 accounts.data()[0].code_hash,
2092 hex::decode("5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e")
2093 .unwrap()
2094 );
2095 }
2096
2097 #[tokio::test]
2098 async fn test_get_protocol_components() {
2099 let mut server = Server::new_async().await;
2100 let server_resp = r#"
2101 {
2102 "protocol_components": [
2103 {
2104 "id": "State1",
2105 "protocol_system": "ambient",
2106 "protocol_type_name": "Pool",
2107 "chain": "ethereum",
2108 "tokens": [
2109 "0x0000000000000000000000000000000000000000",
2110 "0x0000000000000000000000000000000000000001"
2111 ],
2112 "contract_ids": [
2113 "0x0000000000000000000000000000000000000000"
2114 ],
2115 "static_attributes": {
2116 "attribute_1": "0x00000000000003e8"
2117 },
2118 "change": "Creation",
2119 "creation_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2120 "created_at": "2022-01-01T00:00:00"
2121 }
2122 ],
2123 "pagination": {
2124 "page": 0,
2125 "page_size": 20,
2126 "total": 10
2127 }
2128 }
2129 "#;
2130 serde_json::from_str::<ProtocolComponentRequestResponse>(server_resp).expect("deserialize");
2132
2133 let mocked_server = server
2134 .mock("POST", "/v1/protocol_components")
2135 .expect(1)
2136 .with_body(server_resp)
2137 .create_async()
2138 .await;
2139
2140 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2141 .expect("create client");
2142
2143 let components = client
2144 .get_protocol_components(ProtocolComponentsParams::new(Chain::Ethereum, ""))
2145 .await
2146 .expect("get state");
2147
2148 mocked_server.assert();
2149 assert_eq!(components.data().len(), 1);
2150 assert_eq!(components.data()[0].id, "State1");
2151 assert_eq!(components.data()[0].protocol_system, "ambient");
2152 assert_eq!(components.data()[0].protocol_type_name, "Pool");
2153 assert_eq!(components.data()[0].tokens.len(), 2);
2154 let expected_attributes =
2155 [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
2156 .iter()
2157 .cloned()
2158 .collect::<HashMap<String, Bytes>>();
2159 assert_eq!(components.data()[0].static_attributes, expected_attributes);
2160 }
2161
2162 #[tokio::test]
2163 async fn test_get_protocol_states() {
2164 let mut server = Server::new_async().await;
2165 let server_resp = r#"
2166 {
2167 "states": [
2168 {
2169 "component_id": "State1",
2170 "attributes": {
2171 "attribute_1": "0x00000000000003e8"
2172 },
2173 "balances": {
2174 "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
2175 }
2176 }
2177 ],
2178 "pagination": {
2179 "page": 0,
2180 "page_size": 20,
2181 "total": 10
2182 }
2183 }
2184 "#;
2185 serde_json::from_str::<ProtocolStateRequestResponse>(server_resp).expect("deserialize");
2187
2188 let mocked_server = server
2189 .mock("POST", "/v1/protocol_state")
2190 .expect(1)
2191 .with_body(server_resp)
2192 .create_async()
2193 .await;
2194 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2195 .expect("create client");
2196
2197 let states = client
2198 .get_protocol_states(
2199 ProtocolStatesParams::new(Chain::Ethereum, "").with_include_balances(true),
2200 )
2201 .await
2202 .expect("get state");
2203
2204 mocked_server.assert();
2205 assert_eq!(states.data().len(), 1);
2206 assert_eq!(states.data()[0].component_id, "State1");
2207 let expected_attributes =
2208 [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
2209 .iter()
2210 .cloned()
2211 .collect::<HashMap<String, Bytes>>();
2212 assert_eq!(states.data()[0].attributes, expected_attributes);
2213 let expected_balances = [(
2214 Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")
2215 .expect("Unsupported address format"),
2216 Bytes::from_str("0x01f4").unwrap(),
2217 )]
2218 .iter()
2219 .cloned()
2220 .collect::<HashMap<Bytes, Bytes>>();
2221 assert_eq!(states.data()[0].balances, expected_balances);
2222 }
2223
2224 #[tokio::test]
2225 async fn test_get_tokens() {
2226 let mut server = Server::new_async().await;
2227 let server_resp = r#"
2228 {
2229 "tokens": [
2230 {
2231 "chain": "ethereum",
2232 "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
2233 "symbol": "WETH",
2234 "decimals": 18,
2235 "tax": 0,
2236 "gas": [
2237 29962
2238 ],
2239 "quality": 100
2240 },
2241 {
2242 "chain": "ethereum",
2243 "address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
2244 "symbol": "USDC",
2245 "decimals": 6,
2246 "tax": 0,
2247 "gas": [
2248 40652
2249 ],
2250 "quality": 100
2251 }
2252 ],
2253 "pagination": {
2254 "page": 0,
2255 "page_size": 20,
2256 "total": 10
2257 }
2258 }
2259 "#;
2260 serde_json::from_str::<TokensRequestResponse>(server_resp).expect("deserialize");
2262
2263 let mocked_server = server
2264 .mock("POST", "/v1/tokens")
2265 .expect(1)
2266 .with_body(server_resp)
2267 .create_async()
2268 .await;
2269 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2270 .expect("create client");
2271
2272 let tokens = client
2273 .get_tokens(TokensParams::new(Chain::Ethereum))
2274 .await
2275 .expect("get tokens");
2276
2277 let expected = vec![
2278 Token {
2279 chain: tycho_common::models::Chain::Ethereum,
2280 address: Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap(),
2281 symbol: "WETH".to_string(),
2282 decimals: 18,
2283 tax: 0,
2284 gas: vec![Some(29962)],
2285 quality: 100,
2286 },
2287 Token {
2288 chain: tycho_common::models::Chain::Ethereum,
2289 address: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
2290 symbol: "USDC".to_string(),
2291 decimals: 6,
2292 tax: 0,
2293 gas: vec![Some(40652)],
2294 quality: 100,
2295 },
2296 ];
2297
2298 mocked_server.assert();
2299 assert_eq!(*tokens.data(), expected);
2300 }
2301
2302 #[rstest]
2303 #[case::with_dci(Some(vec!["system2"]), vec!["system2"])]
2304 #[case::backward_compat(None, vec![])]
2305 #[tokio::test]
2306 async fn test_get_protocol_systems(
2307 #[case] dci_protocols: Option<Vec<&str>>,
2308 #[case] expected_dci: Vec<&str>,
2309 ) {
2310 use serde_json::json;
2311
2312 let mut json_value = json!({
2313 "protocol_systems": ["system1", "system2"],
2314 "pagination": { "page": 0, "page_size": 20, "total": 2 }
2315 });
2316 if let Some(dci) = dci_protocols {
2317 json_value["dci_protocols"] = json!(dci);
2318 }
2319 let server_resp = serde_json::to_string(&json_value).unwrap();
2320
2321 let mut server = Server::new_async().await;
2322 let mocked_server = server
2323 .mock("POST", "/v1/protocol_systems")
2324 .expect(1)
2325 .with_body(&server_resp)
2326 .create_async()
2327 .await;
2328 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2329 .expect("create client");
2330
2331 let response = client
2332 .get_protocol_systems(ProtocolSystemsParams::new(Chain::Ethereum))
2333 .await
2334 .expect("get protocol systems");
2335
2336 mocked_server.assert();
2337 assert_eq!(response.data().protocol_systems(), ["system1", "system2"]);
2338 assert_eq!(response.data().dci_protocols(), expected_dci.as_slice());
2339 }
2340
2341 #[tokio::test]
2342 async fn test_get_component_tvl() {
2343 let mut server = Server::new_async().await;
2344 let server_resp = r#"
2345 {
2346 "tvl": {
2347 "component1": 100.0
2348 },
2349 "pagination": {
2350 "page": 0,
2351 "page_size": 20,
2352 "total": 10
2353 }
2354 }
2355 "#;
2356 serde_json::from_str::<ComponentTvlRequestResponse>(server_resp).expect("deserialize");
2358
2359 let mocked_server = server
2360 .mock("POST", "/v1/component_tvl")
2361 .expect(1)
2362 .with_body(server_resp)
2363 .create_async()
2364 .await;
2365 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2366 .expect("create client");
2367
2368 let component_tvl = client
2369 .get_component_tvl(ComponentTvlParams::new(Chain::Ethereum))
2370 .await
2371 .expect("get component tvl");
2372
2373 mocked_server.assert();
2374 assert_eq!(component_tvl.data().get("component1"), Some(&100.0));
2375 }
2376
2377 #[tokio::test]
2378 async fn test_get_traced_entry_points() {
2379 let mut server = Server::new_async().await;
2380 let server_resp = r#"
2381 {
2382 "traced_entry_points": {
2383 "component_1": [
2384 [
2385 {
2386 "entry_point": {
2387 "external_id": "entrypoint_a",
2388 "target": "0x0000000000000000000000000000000000000001",
2389 "signature": "sig()"
2390 },
2391 "params": {
2392 "method": "rpctracer",
2393 "caller": "0x000000000000000000000000000000000000000a",
2394 "calldata": "0x000000000000000000000000000000000000000b"
2395 }
2396 },
2397 {
2398 "retriggers": [
2399 [
2400 "0x00000000000000000000000000000000000000aa",
2401 {"key": "0x0000000000000000000000000000000000000aaa", "offset": 12}
2402 ]
2403 ],
2404 "accessed_slots": {
2405 "0x0000000000000000000000000000000000aaaa": [
2406 "0x0000000000000000000000000000000000aaaa"
2407 ]
2408 }
2409 }
2410 ]
2411 ]
2412 },
2413 "pagination": {
2414 "page": 0,
2415 "page_size": 20,
2416 "total": 1
2417 }
2418 }
2419 "#;
2420 serde_json::from_str::<TracedEntryPointRequestResponse>(server_resp).expect("deserialize");
2422
2423 let mocked_server = server
2424 .mock("POST", "/v1/traced_entry_points")
2425 .expect(1)
2426 .with_body(server_resp)
2427 .create_async()
2428 .await;
2429 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2430 .expect("create client");
2431
2432 let entrypoints = client
2433 .get_traced_entry_points(TracedEntryPointsParams::new(Chain::Ethereum, ""))
2434 .await
2435 .expect("get traced entry points");
2436
2437 mocked_server.assert();
2438 assert_eq!(entrypoints.data().len(), 1);
2439 let comp1_entrypoints = entrypoints
2440 .data()
2441 .get("component_1")
2442 .expect("component_1 entrypoints should exist");
2443 assert_eq!(comp1_entrypoints.len(), 1);
2444
2445 let (entrypoint, trace_result) = &comp1_entrypoints[0];
2446 assert_eq!(entrypoint.entry_point.external_id, "entrypoint_a");
2447 assert_eq!(
2448 entrypoint.entry_point.target,
2449 Bytes::from_str("0x0000000000000000000000000000000000000001").unwrap()
2450 );
2451 assert_eq!(entrypoint.entry_point.signature, "sig()");
2452 let tycho_common::models::blockchain::TracingParams::RPCTracer(rpc_params) =
2453 &entrypoint.params;
2454 assert_eq!(
2455 rpc_params.caller,
2456 Some(Bytes::from("0x000000000000000000000000000000000000000a"))
2457 );
2458 assert_eq!(rpc_params.calldata, Bytes::from("0x000000000000000000000000000000000000000b"));
2459
2460 assert_eq!(
2461 trace_result.retriggers,
2462 HashSet::from([(
2463 Bytes::from("0x00000000000000000000000000000000000000aa"),
2464 AddressStorageLocation::new(
2465 Bytes::from("0x0000000000000000000000000000000000000aaa"),
2466 12
2467 )
2468 )])
2469 );
2470 assert_eq!(trace_result.accessed_slots.len(), 1);
2471 assert_eq!(
2472 trace_result.accessed_slots,
2473 HashMap::from([(
2474 Bytes::from("0x0000000000000000000000000000000000aaaa"),
2475 HashSet::from([Bytes::from("0x0000000000000000000000000000000000aaaa")])
2476 )])
2477 );
2478 }
2479
2480 #[tokio::test]
2481 async fn test_parse_retry_value_numeric() {
2482 let result = parse_retry_value("60");
2483 assert!(result.is_some());
2484
2485 let expected_time = SystemTime::now() + Duration::from_secs(60);
2486 let actual_time = result.unwrap();
2487
2488 let diff = if actual_time > expected_time {
2490 actual_time
2491 .duration_since(expected_time)
2492 .unwrap()
2493 } else {
2494 expected_time
2495 .duration_since(actual_time)
2496 .unwrap()
2497 };
2498 assert!(diff < Duration::from_secs(1), "Time difference too large: {:?}", diff);
2499 }
2500
2501 #[tokio::test]
2502 async fn test_parse_retry_value_rfc2822() {
2503 let rfc2822_date = "Sat, 01 Jan 2030 12:00:00 +0000";
2505 let result = parse_retry_value(rfc2822_date);
2506 assert!(result.is_some());
2507
2508 let parsed_time = result.unwrap();
2509 assert!(parsed_time > SystemTime::now());
2510 }
2511
2512 #[tokio::test]
2513 async fn test_parse_retry_value_invalid_formats() {
2514 assert!(parse_retry_value("invalid").is_none());
2516 assert!(parse_retry_value("").is_none());
2517 assert!(parse_retry_value("not_a_number").is_none());
2518 assert!(parse_retry_value("Mon, 32 Jan 2030 25:00:00 +0000").is_none());
2519 }
2521
2522 #[tokio::test]
2523 async fn test_parse_retry_value_zero_seconds() {
2524 let result = parse_retry_value("0");
2525 assert!(result.is_some());
2526
2527 let expected_time = SystemTime::now();
2528 let actual_time = result.unwrap();
2529
2530 let diff = if actual_time > expected_time {
2532 actual_time
2533 .duration_since(expected_time)
2534 .unwrap()
2535 } else {
2536 expected_time
2537 .duration_since(actual_time)
2538 .unwrap()
2539 };
2540 assert!(diff < Duration::from_secs(1));
2541 }
2542
2543 #[tokio::test]
2544 async fn test_error_for_response_rate_limited() {
2545 let mut server = Server::new_async().await;
2546 let mock = server
2547 .mock("GET", "/test")
2548 .with_status(429)
2549 .with_header("Retry-After", "60")
2550 .create_async()
2551 .await;
2552
2553 let client = reqwest::Client::new();
2554 let response = client
2555 .get(format!("{}/test", server.url()))
2556 .send()
2557 .await
2558 .unwrap();
2559
2560 let http_client =
2561 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2562 .unwrap()
2563 .with_test_backoff_policy();
2564 let result = http_client
2565 .error_for_response(response)
2566 .await;
2567
2568 mock.assert();
2569 assert!(matches!(result, Err(RPCError::RateLimited(_))));
2570 if let Err(RPCError::RateLimited(retry_after)) = result {
2571 assert!(retry_after.is_some());
2572 }
2573 }
2574
2575 #[tokio::test]
2576 async fn test_error_for_response_rate_limited_no_header() {
2577 let mut server = Server::new_async().await;
2578 let mock = server
2579 .mock("GET", "/test")
2580 .with_status(429)
2581 .create_async()
2582 .await;
2583
2584 let client = reqwest::Client::new();
2585 let response = client
2586 .get(format!("{}/test", server.url()))
2587 .send()
2588 .await
2589 .unwrap();
2590
2591 let http_client =
2592 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2593 .unwrap()
2594 .with_test_backoff_policy();
2595 let result = http_client
2596 .error_for_response(response)
2597 .await;
2598
2599 mock.assert();
2600 assert!(matches!(result, Err(RPCError::RateLimited(None))));
2601 }
2602
2603 #[tokio::test]
2604 async fn test_error_for_response_server_errors() {
2605 let test_cases =
2606 vec![(502, "Bad Gateway"), (503, "Service Unavailable"), (504, "Gateway Timeout")];
2607
2608 for (status_code, expected_body) in test_cases {
2609 let mut server = Server::new_async().await;
2610 let mock = server
2611 .mock("GET", "/test")
2612 .with_status(status_code)
2613 .with_body(expected_body)
2614 .create_async()
2615 .await;
2616
2617 let client = reqwest::Client::new();
2618 let response = client
2619 .get(format!("{}/test", server.url()))
2620 .send()
2621 .await
2622 .unwrap();
2623
2624 let http_client =
2625 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2626 .unwrap()
2627 .with_test_backoff_policy();
2628 let result = http_client
2629 .error_for_response(response)
2630 .await;
2631
2632 mock.assert();
2633 assert!(matches!(result, Err(RPCError::ServerUnreachable(_))));
2634 if let Err(RPCError::ServerUnreachable(body)) = result {
2635 assert_eq!(body, expected_body);
2636 }
2637 }
2638 }
2639
2640 #[tokio::test]
2641 async fn test_error_for_response_success() {
2642 let mut server = Server::new_async().await;
2643 let mock = server
2644 .mock("GET", "/test")
2645 .with_status(200)
2646 .with_body("success")
2647 .create_async()
2648 .await;
2649
2650 let client = reqwest::Client::new();
2651 let response = client
2652 .get(format!("{}/test", server.url()))
2653 .send()
2654 .await
2655 .unwrap();
2656
2657 let http_client =
2658 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2659 .unwrap()
2660 .with_test_backoff_policy();
2661 let result = http_client
2662 .error_for_response(response)
2663 .await;
2664
2665 mock.assert();
2666 assert!(result.is_ok());
2667
2668 let response = result.unwrap();
2669 assert_eq!(response.status(), 200);
2670 }
2671
2672 #[tokio::test]
2673 async fn test_handle_error_for_backoff_server_unreachable() {
2674 let http_client =
2675 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2676 .unwrap()
2677 .with_test_backoff_policy();
2678 let error = RPCError::ServerUnreachable("Service down".to_string());
2679
2680 let backoff_error = http_client
2681 .handle_error_for_backoff(error)
2682 .await;
2683
2684 match backoff_error {
2685 backoff::Error::Transient { err: RPCError::ServerUnreachable(msg), retry_after } => {
2686 assert_eq!(msg, "Service down");
2687 assert_eq!(retry_after, Some(Duration::from_millis(50))); }
2689 _ => panic!("Expected transient error for ServerUnreachable"),
2690 }
2691 }
2692
2693 #[tokio::test]
2694 async fn test_handle_error_for_backoff_rate_limited_with_retry_after() {
2695 let http_client =
2696 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2697 .unwrap()
2698 .with_test_backoff_policy();
2699 let future_time = SystemTime::now() + Duration::from_secs(30);
2700 let error = RPCError::RateLimited(Some(future_time));
2701
2702 let backoff_error = http_client
2703 .handle_error_for_backoff(error)
2704 .await;
2705
2706 match backoff_error {
2707 backoff::Error::Transient { err: RPCError::RateLimited(retry_after), .. } => {
2708 assert_eq!(retry_after, Some(future_time));
2709 }
2710 _ => panic!("Expected transient error for RateLimited"),
2711 }
2712
2713 let stored_retry_after = http_client.retry_after.read().await;
2715 assert_eq!(*stored_retry_after, Some(future_time));
2716 }
2717
2718 #[tokio::test]
2719 async fn test_handle_error_for_backoff_rate_limited_no_retry_after() {
2720 let http_client =
2721 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2722 .unwrap()
2723 .with_test_backoff_policy();
2724 let error = RPCError::RateLimited(None);
2725
2726 let backoff_error = http_client
2727 .handle_error_for_backoff(error)
2728 .await;
2729
2730 match backoff_error {
2731 backoff::Error::Transient { err: RPCError::RateLimited(None), .. } => {
2732 }
2734 _ => panic!("Expected transient error for RateLimited without retry-after"),
2735 }
2736 }
2737
2738 #[tokio::test]
2739 async fn test_handle_error_for_backoff_other_errors() {
2740 let http_client =
2741 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2742 .unwrap()
2743 .with_test_backoff_policy();
2744 let error = RPCError::ParseResponse("Invalid JSON".to_string());
2745
2746 let backoff_error = http_client
2747 .handle_error_for_backoff(error)
2748 .await;
2749
2750 match backoff_error {
2751 backoff::Error::Permanent(RPCError::ParseResponse(msg)) => {
2752 assert_eq!(msg, "Invalid JSON");
2753 }
2754 _ => panic!("Expected permanent error for ParseResponse"),
2755 }
2756 }
2757
2758 #[tokio::test]
2759 async fn test_wait_until_retry_after_no_retry_time() {
2760 let http_client =
2761 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2762 .unwrap()
2763 .with_test_backoff_policy();
2764
2765 let start = std::time::Instant::now();
2766 http_client
2767 .wait_until_retry_after()
2768 .await;
2769 let elapsed = start.elapsed();
2770
2771 assert!(elapsed < Duration::from_millis(100));
2773 }
2774
2775 #[tokio::test]
2776 async fn test_wait_until_retry_after_past_time() {
2777 let http_client =
2778 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2779 .unwrap()
2780 .with_test_backoff_policy();
2781
2782 let past_time = SystemTime::now() - Duration::from_secs(10);
2784 *http_client.retry_after.write().await = Some(past_time);
2785
2786 let start = std::time::Instant::now();
2787 http_client
2788 .wait_until_retry_after()
2789 .await;
2790 let elapsed = start.elapsed();
2791
2792 assert!(elapsed < Duration::from_millis(100));
2794 }
2795
2796 #[tokio::test]
2797 async fn test_wait_until_retry_after_future_time() {
2798 let http_client =
2799 HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2800 .unwrap()
2801 .with_test_backoff_policy();
2802
2803 let future_time = SystemTime::now() + Duration::from_millis(100);
2805 *http_client.retry_after.write().await = Some(future_time);
2806
2807 let start = std::time::Instant::now();
2808 http_client
2809 .wait_until_retry_after()
2810 .await;
2811 let elapsed = start.elapsed();
2812
2813 assert!(elapsed >= Duration::from_millis(80)); assert!(elapsed <= Duration::from_millis(200)); }
2817
2818 #[tokio::test]
2819 async fn test_make_post_request_success() {
2820 let mut server = Server::new_async().await;
2821 let server_resp = r#"{"success": true}"#;
2822
2823 let mock = server
2824 .mock("POST", "/test")
2825 .with_status(200)
2826 .with_body(server_resp)
2827 .create_async()
2828 .await;
2829
2830 let http_client =
2831 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2832 .unwrap()
2833 .with_test_backoff_policy();
2834 let request_body = serde_json::json!({"test": "data"});
2835 let uri = format!("{}/test", server.url());
2836
2837 let result = http_client
2838 .make_post_request(&request_body, &uri)
2839 .await;
2840
2841 mock.assert();
2842 assert!(result.is_ok());
2843
2844 let response = result.unwrap();
2845 assert_eq!(response.status(), 200);
2846 assert_eq!(response.text().await.unwrap(), server_resp);
2847 }
2848
2849 #[tokio::test]
2850 async fn test_make_post_request_retry_on_server_error() {
2851 let mut server = Server::new_async().await;
2852 let error_mock = server
2854 .mock("POST", "/test")
2855 .with_status(503)
2856 .with_body("Service Unavailable")
2857 .expect(1)
2858 .create_async()
2859 .await;
2860
2861 let success_mock = server
2862 .mock("POST", "/test")
2863 .with_status(200)
2864 .with_body(r#"{"success": true}"#)
2865 .expect(1)
2866 .create_async()
2867 .await;
2868
2869 let http_client =
2870 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2871 .unwrap()
2872 .with_test_backoff_policy();
2873 let request_body = serde_json::json!({"test": "data"});
2874 let uri = format!("{}/test", server.url());
2875
2876 let result = http_client
2877 .make_post_request(&request_body, &uri)
2878 .await;
2879
2880 error_mock.assert();
2881 success_mock.assert();
2882 assert!(result.is_ok());
2883 }
2884
2885 #[tokio::test]
2886 async fn test_make_post_request_respect_retry_after_header() {
2887 let mut server = Server::new_async().await;
2888
2889 let rate_limit_mock = server
2891 .mock("POST", "/test")
2892 .with_status(429)
2893 .with_header("Retry-After", "1") .expect(1)
2895 .create_async()
2896 .await;
2897
2898 let success_mock = server
2899 .mock("POST", "/test")
2900 .with_status(200)
2901 .with_body(r#"{"success": true}"#)
2902 .expect(1)
2903 .create_async()
2904 .await;
2905
2906 let http_client =
2907 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2908 .unwrap()
2909 .with_test_backoff_policy();
2910 let request_body = serde_json::json!({"test": "data"});
2911 let uri = format!("{}/test", server.url());
2912
2913 let start = std::time::Instant::now();
2914 let result = http_client
2915 .make_post_request(&request_body, &uri)
2916 .await;
2917 let elapsed = start.elapsed();
2918
2919 rate_limit_mock.assert();
2920 success_mock.assert();
2921 assert!(result.is_ok());
2922
2923 assert!(elapsed >= Duration::from_millis(900)); assert!(elapsed <= Duration::from_millis(2000)); }
2927
2928 #[tokio::test]
2929 async fn test_make_post_request_permanent_error() {
2930 let mut server = Server::new_async().await;
2931
2932 let mock = server
2933 .mock("POST", "/test")
2934 .with_status(400) .with_body("Bad Request")
2936 .expect(1)
2937 .create_async()
2938 .await;
2939
2940 let http_client =
2941 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2942 .unwrap()
2943 .with_test_backoff_policy();
2944 let request_body = serde_json::json!({"test": "data"});
2945 let uri = format!("{}/test", server.url());
2946
2947 let result = http_client
2948 .make_post_request(&request_body, &uri)
2949 .await;
2950
2951 mock.assert();
2952 assert!(result.is_ok()); let response = result.unwrap();
2955 assert_eq!(response.status(), 400);
2956 }
2957
2958 #[tokio::test]
2959 async fn test_concurrent_requests_with_different_retry_after() {
2960 let mut server = Server::new_async().await;
2961
2962 let rate_limit_mock_1 = server
2964 .mock("POST", "/test1")
2965 .with_status(429)
2966 .with_header("Retry-After", "1")
2967 .expect(1)
2968 .create_async()
2969 .await;
2970
2971 let rate_limit_mock_2 = server
2973 .mock("POST", "/test2")
2974 .with_status(429)
2975 .with_header("Retry-After", "2")
2976 .expect(1)
2977 .create_async()
2978 .await;
2979
2980 let success_mock_1 = server
2982 .mock("POST", "/test1")
2983 .with_status(200)
2984 .with_body(r#"{"result": "success1"}"#)
2985 .expect(1)
2986 .create_async()
2987 .await;
2988
2989 let success_mock_2 = server
2990 .mock("POST", "/test2")
2991 .with_status(200)
2992 .with_body(r#"{"result": "success2"}"#)
2993 .expect(1)
2994 .create_async()
2995 .await;
2996
2997 let http_client =
2998 HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2999 .unwrap()
3000 .with_test_backoff_policy();
3001 let request_body = serde_json::json!({"test": "data"});
3002
3003 let uri1 = format!("{}/test1", server.url());
3004 let uri2 = format!("{}/test2", server.url());
3005
3006 let start = std::time::Instant::now();
3008 let (result1, result2) = tokio::join!(
3009 http_client.make_post_request(&request_body, &uri1),
3010 http_client.make_post_request(&request_body, &uri2)
3011 );
3012 let elapsed = start.elapsed();
3013
3014 rate_limit_mock_1.assert();
3015 rate_limit_mock_2.assert();
3016 success_mock_1.assert();
3017 success_mock_2.assert();
3018
3019 assert!(result1.is_ok());
3020 assert!(result2.is_ok());
3021
3022 assert!(elapsed >= Duration::from_millis(1800)); assert!(elapsed <= Duration::from_millis(3000)); let final_retry_after = http_client.retry_after.read().await;
3030 assert!(final_retry_after.is_some());
3031
3032 if let Some(retry_time) = *final_retry_after {
3034 let now = SystemTime::now();
3037 let diff = if retry_time > now {
3038 retry_time.duration_since(now).unwrap()
3039 } else {
3040 now.duration_since(retry_time).unwrap()
3041 };
3042
3043 assert!(diff <= Duration::from_secs(3), "Retry time difference too large: {:?}", diff);
3045 }
3046 }
3047
3048 #[tokio::test]
3049 async fn test_get_snapshots() {
3050 let mut server = Server::new_async().await;
3051
3052 let protocol_states_resp = r#"
3054 {
3055 "states": [
3056 {
3057 "component_id": "component1",
3058 "attributes": {
3059 "attribute_1": "0x00000000000003e8"
3060 },
3061 "balances": {
3062 "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
3063 }
3064 }
3065 ],
3066 "pagination": {
3067 "page": 0,
3068 "page_size": 100,
3069 "total": 1
3070 }
3071 }
3072 "#;
3073
3074 let contract_state_resp = r#"
3076 {
3077 "accounts": [
3078 {
3079 "chain": "ethereum",
3080 "address": "0x1111111111111111111111111111111111111111",
3081 "title": "",
3082 "slots": {},
3083 "native_balance": "0x01f4",
3084 "token_balances": {},
3085 "code": "0x00",
3086 "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
3087 "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
3088 "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
3089 "creation_tx": null
3090 }
3091 ],
3092 "pagination": {
3093 "page": 0,
3094 "page_size": 100,
3095 "total": 1
3096 }
3097 }
3098 "#;
3099
3100 let tvl_resp = r#"
3102 {
3103 "tvl": {
3104 "component1": 1000000.0
3105 },
3106 "pagination": {
3107 "page": 0,
3108 "page_size": 100,
3109 "total": 1
3110 }
3111 }
3112 "#;
3113
3114 let protocol_states_mock = server
3115 .mock("POST", "/v1/protocol_state")
3116 .expect(1)
3117 .with_body(protocol_states_resp)
3118 .create_async()
3119 .await;
3120
3121 let contract_state_mock = server
3122 .mock("POST", "/v1/contract_state")
3123 .expect(1)
3124 .with_body(contract_state_resp)
3125 .create_async()
3126 .await;
3127
3128 let tvl_mock = server
3129 .mock("POST", "/v1/component_tvl")
3130 .expect(1)
3131 .with_body(tvl_resp)
3132 .create_async()
3133 .await;
3134
3135 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3136 .expect("create client");
3137
3138 let component = tycho_common::models::protocol::ProtocolComponent {
3139 id: "component1".to_string(),
3140 protocol_system: "test_protocol".to_string(),
3141 protocol_type_name: "test_type".to_string(),
3142 chain: Chain::Ethereum,
3143 tokens: vec![],
3144 contract_addresses: vec![
3145 Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()
3146 ],
3147 static_attributes: HashMap::new(),
3148 change: tycho_common::models::ChangeType::Creation,
3149 creation_tx: Bytes::from_str(
3150 "0x0000000000000000000000000000000000000000000000000000000000000000",
3151 )
3152 .unwrap(),
3153 created_at: chrono::Utc::now().naive_utc(),
3154 };
3155
3156 let mut components = HashMap::new();
3157 components.insert("component1".to_string(), component);
3158
3159 let contract_ids =
3160 vec![Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()];
3161
3162 let request = SnapshotParameters::new(
3163 Chain::Ethereum,
3164 "test_protocol",
3165 &components,
3166 &contract_ids,
3167 12345,
3168 );
3169
3170 let response = client
3171 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3172 .await
3173 .expect("get snapshots");
3174
3175 protocol_states_mock.assert();
3177 contract_state_mock.assert();
3178 tvl_mock.assert();
3179
3180 assert_eq!(response.states.len(), 1);
3182 assert!(response
3183 .states
3184 .contains_key("component1"));
3185
3186 let component_state = response
3188 .states
3189 .get("component1")
3190 .unwrap();
3191 assert_eq!(component_state.component_tvl, Some(1000000.0));
3192
3193 assert_eq!(response.vm_storage.len(), 1);
3195 let contract_addr = Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap();
3196 assert!(response
3197 .vm_storage
3198 .contains_key(&contract_addr));
3199 }
3200
3201 #[tokio::test]
3202 async fn test_get_snapshots_empty_components() {
3203 let server = Server::new_async().await;
3204 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3205 .expect("create client");
3206
3207 let components = HashMap::new();
3208 let contract_ids = vec![];
3209
3210 let request = SnapshotParameters::new(
3211 Chain::Ethereum,
3212 "test_protocol",
3213 &components,
3214 &contract_ids,
3215 12345,
3216 );
3217
3218 let response = client
3219 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3220 .await
3221 .expect("get snapshots");
3222
3223 assert!(response.states.is_empty());
3225 assert!(response.vm_storage.is_empty());
3226 }
3227
3228 #[tokio::test]
3229 async fn test_get_snapshots_without_tvl() {
3230 let mut server = Server::new_async().await;
3231
3232 let protocol_states_resp = r#"
3233 {
3234 "states": [
3235 {
3236 "component_id": "component1",
3237 "attributes": {},
3238 "balances": {}
3239 }
3240 ],
3241 "pagination": {
3242 "page": 0,
3243 "page_size": 100,
3244 "total": 1
3245 }
3246 }
3247 "#;
3248
3249 let protocol_states_mock = server
3250 .mock("POST", "/v1/protocol_state")
3251 .expect(1)
3252 .with_body(protocol_states_resp)
3253 .create_async()
3254 .await;
3255
3256 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3257 .expect("create client");
3258
3259 let component = tycho_common::models::protocol::ProtocolComponent {
3261 id: "component1".to_string(),
3262 protocol_system: "test_protocol".to_string(),
3263 protocol_type_name: "test_type".to_string(),
3264 chain: Chain::Ethereum,
3265 tokens: vec![],
3266 contract_addresses: vec![],
3267 static_attributes: HashMap::new(),
3268 change: tycho_common::models::ChangeType::Creation,
3269 creation_tx: Bytes::from_str(
3270 "0x0000000000000000000000000000000000000000000000000000000000000000",
3271 )
3272 .unwrap(),
3273 created_at: chrono::Utc::now().naive_utc(),
3274 };
3275
3276 let mut components = HashMap::new();
3277 components.insert("component1".to_string(), component);
3278 let contract_ids = vec![];
3279
3280 let request = SnapshotParameters::new(
3281 Chain::Ethereum,
3282 "test_protocol",
3283 &components,
3284 &contract_ids,
3285 12345,
3286 )
3287 .include_balances(false)
3288 .include_tvl(false);
3289
3290 let response = client
3291 .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3292 .await
3293 .expect("get snapshots");
3294
3295 protocol_states_mock.assert();
3297 assert_eq!(response.states.len(), 1);
3301 let component_state = response
3303 .states
3304 .get("component1")
3305 .unwrap();
3306 assert_eq!(component_state.component_tvl, None);
3307 }
3308
3309 #[tokio::test]
3310 async fn test_compression_enabled() {
3311 let mut server = Server::new_async().await;
3312 let server_resp = GET_CONTRACT_STATE_RESP;
3313
3314 let compressed_body =
3316 zstd::encode_all(server_resp.as_bytes(), 0).expect("compression failed");
3317
3318 let mocked_server = server
3319 .mock("POST", "/v1/contract_state")
3320 .expect(1)
3321 .with_header("Content-Encoding", "zstd")
3322 .with_body(compressed_body)
3323 .create_async()
3324 .await;
3325
3326 let client = HttpRPCClient::new(
3328 server.url().as_str(),
3329 HttpRPCClientOptions::new().with_compression(true),
3330 )
3331 .expect("create client");
3332
3333 let response = client
3334 .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
3335 .await
3336 .expect("get state");
3337 let accounts = response;
3338
3339 mocked_server.assert();
3340 assert_eq!(accounts.data().len(), 1);
3341 assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
3342 }
3343
3344 #[tokio::test]
3345 async fn test_compression_disabled() {
3346 let mut server = Server::new_async().await;
3347 let server_resp = GET_CONTRACT_STATE_RESP;
3348
3349 let mocked_server = server
3352 .mock("POST", "/v1/contract_state")
3353 .expect(1)
3354 .match_header("Accept-Encoding", mockito::Matcher::Missing)
3355 .with_status(200)
3356 .with_body(server_resp)
3357 .create_async()
3358 .await;
3359
3360 let client = HttpRPCClient::new(
3362 server.url().as_str(),
3363 HttpRPCClientOptions::new().with_compression(false),
3364 )
3365 .expect("create client");
3366
3367 let response = client
3368 .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
3369 .await
3370 .expect("get state");
3371 let accounts = response;
3372
3373 mocked_server.assert();
3375 assert_eq!(accounts.data().len(), 1);
3376 assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
3377 }
3378
3379 #[rstest]
3380 #[case::single_page(2, 1000)]
3381 #[case::multiple_pages_within_concurrency(10, 2)]
3382 #[case::exceeds_concurrency_limit(60, 2)]
3383 #[tokio::test]
3384 async fn test_get_all_tokens_pagination_and_concurrency(
3385 #[case] total_tokens: usize,
3386 #[case] page_size: usize,
3387 ) {
3388 use std::sync::atomic::{AtomicUsize, Ordering};
3389
3390 let allowed_concurrency = 10;
3391
3392 let concurrent_requests = Arc::new(AtomicUsize::new(0));
3393 let max_concurrent = Arc::new(AtomicUsize::new(0));
3394
3395 let mut server = Server::new_async().await;
3396
3397 let total_pages = (total_tokens as f64 / page_size as f64).ceil() as i64;
3398
3399 for page in 0..total_pages {
3401 let concurrent = concurrent_requests.clone();
3402 let max_conc = max_concurrent.clone();
3403
3404 let tokens_in_page = {
3405 let start_idx = (page as usize) * page_size;
3406 let end_idx = ((page as usize + 1) * page_size).min(total_tokens);
3407 (start_idx..end_idx)
3408 .map(|i| {
3409 format!(
3410 r#"{{
3411 "chain": "ethereum",
3412 "address": "0x{i:040x}",
3413 "symbol": "TOKEN_{i}",
3414 "decimals": 18,
3415 "tax": 0,
3416 "gas": [30000],
3417 "quality": 100
3418 }}"#
3419 )
3420 })
3421 .collect::<Vec<_>>()
3422 };
3423
3424 let tokens_json = tokens_in_page.join(",");
3425 let response = format!(
3426 r#"{{
3427 "tokens": [{tokens_json}],
3428 "pagination": {{
3429 "page": {page},
3430 "page_size": {page_size},
3431 "total": {total_tokens}
3432 }}
3433 }}"#,
3434 );
3435
3436 server
3437 .mock("POST", "/v1/tokens")
3438 .expect(1)
3439 .with_chunked_body(move |w| {
3440 let current = concurrent.fetch_add(1, Ordering::SeqCst);
3442 max_conc.fetch_max(current + 1, Ordering::SeqCst);
3443
3444 std::thread::sleep(Duration::from_millis(10));
3446
3447 concurrent.fetch_sub(1, Ordering::SeqCst);
3448
3449 w.write_all(response.as_bytes())
3450 })
3451 .create_async()
3452 .await;
3453 }
3454
3455 let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3456 .expect("create client");
3457
3458 let tokens = client
3459 .get_all_tokens(
3460 AllTokensParams::new(Chain::Ethereum, allowed_concurrency)
3461 .with_chunk_size(page_size),
3462 )
3463 .await
3464 .expect("get all tokens");
3465
3466 let max = max_concurrent.load(Ordering::SeqCst);
3468 let expected_max_concurrency = (total_pages as usize)
3469 .saturating_sub(1)
3470 .min(allowed_concurrency);
3471 assert!(
3472 max <= allowed_concurrency,
3473 "Expected max concurrent requests <= {allowed_concurrency}, got {max}"
3474 );
3475
3476 if total_pages > 1 && expected_max_concurrency > 1 {
3478 assert!(
3479 max > 0,
3480 "Expected some concurrent requests for multi-page response, got {max}"
3481 );
3482 }
3483
3484 assert_eq!(
3486 tokens.len(),
3487 total_tokens,
3488 "Expected {total_tokens} tokens, got {}",
3489 tokens.len()
3490 );
3491
3492 for (i, token) in tokens.iter().enumerate() {
3494 assert_eq!(token.symbol, format!("TOKEN_{i}"), "Token at index {i} has wrong symbol");
3495 }
3496 }
3497}