1use std::{collections::HashMap, sync::Arc};
7
8use async_trait::async_trait;
9use futures03::future::try_join_all;
10#[cfg(test)]
11use mockall::automock;
12use reqwest::{header, Client, ClientBuilder, Url};
13use thiserror::Error;
14use tokio::sync::Semaphore;
15use tracing::{debug, error, instrument, trace, warn};
16use tycho_common::{
17 dto::{
18 Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse, PaginationParams,
19 PaginationResponse, ProtocolComponentRequestResponse, ProtocolComponentsRequestBody,
20 ProtocolStateRequestBody, ProtocolStateRequestResponse, ProtocolSystemsRequestBody,
21 ProtocolSystemsRequestResponse, ResponseToken, StateRequestBody, StateRequestResponse,
22 TokensRequestBody, TokensRequestResponse, VersionParam,
23 },
24 Bytes,
25};
26
27use crate::TYCHO_SERVER_VERSION;
28
29#[derive(Error, Debug)]
30pub enum RPCError {
31 #[error("Failed to parse URL: {0}. Error: {1}")]
33 UrlParsing(String, String),
34 #[error("Failed to format request: {0}")]
36 FormatRequest(String),
37 #[error("Unexpected HTTP client error: {0}")]
39 HttpClient(String),
40 #[error("Failed to parse response: {0}")]
42 ParseResponse(String),
43 #[error("Fatal error: {0}")]
44 Fatal(String),
45}
46
47#[cfg_attr(test, automock)]
48#[async_trait]
49pub trait RPCClient: Send + Sync {
50 async fn get_contract_state(
52 &self,
53 request: &StateRequestBody,
54 ) -> Result<StateRequestResponse, RPCError>;
55
56 async fn get_contract_state_paginated(
57 &self,
58 chain: Chain,
59 ids: &[Bytes],
60 protocol_system: &str,
61 version: &VersionParam,
62 chunk_size: usize,
63 concurrency: usize,
64 ) -> Result<StateRequestResponse, RPCError> {
65 let semaphore = Arc::new(Semaphore::new(concurrency));
66
67 let chunked_bodies = ids
68 .chunks(chunk_size)
69 .map(|chunk| StateRequestBody {
70 contract_ids: Some(chunk.to_vec()),
71 protocol_system: protocol_system.to_string(),
72 chain,
73 version: version.clone(),
74 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
75 })
76 .collect::<Vec<_>>();
77
78 let mut tasks = Vec::new();
79 for body in chunked_bodies.iter() {
80 let sem = semaphore.clone();
81 tasks.push(async move {
82 let _permit = sem
83 .acquire()
84 .await
85 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
86 self.get_contract_state(body).await
87 });
88 }
89
90 let responses = try_join_all(tasks).await?;
92
93 let accounts = responses
95 .iter()
96 .flat_map(|r| r.accounts.clone())
97 .collect();
98 let total: i64 = responses
99 .iter()
100 .map(|r| r.pagination.total)
101 .sum();
102
103 Ok(StateRequestResponse {
104 accounts,
105 pagination: PaginationResponse { page: 0, page_size: chunk_size as i64, total },
106 })
107 }
108
109 async fn get_protocol_components(
110 &self,
111 request: &ProtocolComponentsRequestBody,
112 ) -> Result<ProtocolComponentRequestResponse, RPCError>;
113
114 async fn get_protocol_components_paginated(
115 &self,
116 request: &ProtocolComponentsRequestBody,
117 chunk_size: usize,
118 concurrency: usize,
119 ) -> Result<ProtocolComponentRequestResponse, RPCError> {
120 let semaphore = Arc::new(Semaphore::new(concurrency));
121
122 match request.component_ids {
125 Some(ref ids) => {
126 let chunked_bodies = ids
128 .chunks(chunk_size)
129 .enumerate()
130 .map(|(index, _)| ProtocolComponentsRequestBody {
131 protocol_system: request.protocol_system.clone(),
132 component_ids: request.component_ids.clone(),
133 tvl_gt: request.tvl_gt,
134 chain: request.chain,
135 pagination: PaginationParams {
136 page: index as i64,
137 page_size: chunk_size as i64,
138 },
139 })
140 .collect::<Vec<_>>();
141
142 let mut tasks = Vec::new();
143 for body in chunked_bodies.iter() {
144 let sem = semaphore.clone();
145 tasks.push(async move {
146 let _permit = sem
147 .acquire()
148 .await
149 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
150 self.get_protocol_components(body).await
151 });
152 }
153
154 try_join_all(tasks)
155 .await
156 .map(|responses| ProtocolComponentRequestResponse {
157 protocol_components: responses
158 .into_iter()
159 .flat_map(|r| r.protocol_components.into_iter())
160 .collect(),
161 pagination: PaginationResponse {
162 page: 0,
163 page_size: chunk_size as i64,
164 total: ids.len() as i64,
165 },
166 })
167 }
168 _ => {
169 let initial_request = ProtocolComponentsRequestBody {
173 protocol_system: request.protocol_system.clone(),
174 component_ids: request.component_ids.clone(),
175 tvl_gt: request.tvl_gt,
176 chain: request.chain,
177 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
178 };
179 let first_response = self
180 .get_protocol_components(&initial_request)
181 .await
182 .map_err(|err| RPCError::Fatal(err.to_string()))?;
183
184 let total_items = first_response.pagination.total;
185 let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
186
187 let mut accumulated_response = ProtocolComponentRequestResponse {
189 protocol_components: first_response.protocol_components,
190 pagination: PaginationResponse {
191 page: 0,
192 page_size: chunk_size as i64,
193 total: total_items,
194 },
195 };
196
197 let mut page = 1;
198 while page < total_pages {
199 let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
200
201 let chunked_bodies = (0..requests_in_this_iteration)
203 .map(|iter| ProtocolComponentsRequestBody {
204 protocol_system: request.protocol_system.clone(),
205 component_ids: request.component_ids.clone(),
206 tvl_gt: request.tvl_gt,
207 chain: request.chain,
208 pagination: PaginationParams {
209 page: page + iter,
210 page_size: chunk_size as i64,
211 },
212 })
213 .collect::<Vec<_>>();
214
215 let tasks: Vec<_> = chunked_bodies
216 .iter()
217 .map(|body| {
218 let sem = semaphore.clone();
219 async move {
220 let _permit = sem.acquire().await.map_err(|_| {
221 RPCError::Fatal("Semaphore dropped".to_string())
222 })?;
223 self.get_protocol_components(body).await
224 }
225 })
226 .collect();
227
228 let responses = try_join_all(tasks)
229 .await
230 .map(|responses| {
231 let total = responses[0].pagination.total;
232 ProtocolComponentRequestResponse {
233 protocol_components: responses
234 .into_iter()
235 .flat_map(|r| r.protocol_components.into_iter())
236 .collect(),
237 pagination: PaginationResponse {
238 page,
239 page_size: chunk_size as i64,
240 total,
241 },
242 }
243 });
244
245 match responses {
247 Ok(mut resp) => {
248 accumulated_response
249 .protocol_components
250 .append(&mut resp.protocol_components);
251 }
252 Err(e) => return Err(e),
253 }
254
255 page += concurrency as i64;
256 }
257 Ok(accumulated_response)
258 }
259 }
260 }
261
262 async fn get_protocol_states(
263 &self,
264 request: &ProtocolStateRequestBody,
265 ) -> Result<ProtocolStateRequestResponse, RPCError>;
266
267 #[allow(clippy::too_many_arguments)]
268 async fn get_protocol_states_paginated<T>(
269 &self,
270 chain: Chain,
271 ids: &[T],
272 protocol_system: &str,
273 include_balances: bool,
274 version: &VersionParam,
275 chunk_size: usize,
276 concurrency: usize,
277 ) -> Result<ProtocolStateRequestResponse, RPCError>
278 where
279 T: AsRef<str> + Sync + 'static,
280 {
281 let semaphore = Arc::new(Semaphore::new(concurrency));
282 let chunked_bodies = ids
283 .chunks(chunk_size)
284 .map(|c| ProtocolStateRequestBody {
285 protocol_ids: Some(
286 c.iter()
287 .map(|id| id.as_ref().to_string())
288 .collect(),
289 ),
290 protocol_system: protocol_system.to_string(),
291 chain,
292 include_balances,
293 version: version.clone(),
294 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
295 })
296 .collect::<Vec<_>>();
297
298 let mut tasks = Vec::new();
299 for body in chunked_bodies.iter() {
300 let sem = semaphore.clone();
301 tasks.push(async move {
302 let _permit = sem
303 .acquire()
304 .await
305 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
306 self.get_protocol_states(body).await
307 });
308 }
309
310 try_join_all(tasks)
311 .await
312 .map(|responses| {
313 let states = responses
314 .clone()
315 .into_iter()
316 .flat_map(|r| r.states)
317 .collect();
318 let total = responses
319 .iter()
320 .map(|r| r.pagination.total)
321 .sum();
322 ProtocolStateRequestResponse {
323 states,
324 pagination: PaginationResponse { page: 0, page_size: chunk_size as i64, total },
325 }
326 })
327 }
328
329 async fn get_tokens(
332 &self,
333 request: &TokensRequestBody,
334 ) -> Result<TokensRequestResponse, RPCError>;
335
336 async fn get_all_tokens(
337 &self,
338 chain: Chain,
339 min_quality: Option<i32>,
340 traded_n_days_ago: Option<u64>,
341 chunk_size: usize,
342 ) -> Result<Vec<ResponseToken>, RPCError> {
343 let mut request_page = 0;
344 let mut all_tokens = Vec::new();
345 loop {
346 let mut response = self
347 .get_tokens(&TokensRequestBody {
348 token_addresses: None,
349 min_quality,
350 traded_n_days_ago,
351 pagination: PaginationParams {
352 page: request_page,
353 page_size: chunk_size.try_into().map_err(|_| {
354 RPCError::FormatRequest(
355 "Failed to convert chunk_size into i64".to_string(),
356 )
357 })?,
358 },
359 chain,
360 })
361 .await?;
362
363 let num_tokens = response.tokens.len();
364 all_tokens.append(&mut response.tokens);
365 request_page += 1;
366
367 if num_tokens < chunk_size {
368 break;
369 }
370 }
371 Ok(all_tokens)
372 }
373
374 async fn get_protocol_systems(
375 &self,
376 request: &ProtocolSystemsRequestBody,
377 ) -> Result<ProtocolSystemsRequestResponse, RPCError>;
378
379 async fn get_component_tvl(
380 &self,
381 request: &ComponentTvlRequestBody,
382 ) -> Result<ComponentTvlRequestResponse, RPCError>;
383
384 async fn get_component_tvl_paginated(
385 &self,
386 request: &ComponentTvlRequestBody,
387 chunk_size: usize,
388 concurrency: usize,
389 ) -> Result<ComponentTvlRequestResponse, RPCError> {
390 let semaphore = Arc::new(Semaphore::new(concurrency));
391
392 match request.component_ids {
393 Some(ref ids) => {
394 let chunked_requests = ids
395 .chunks(chunk_size)
396 .enumerate()
397 .map(|(index, _)| ComponentTvlRequestBody {
398 chain: request.chain,
399 protocol_system: request.protocol_system.clone(),
400 component_ids: Some(ids.clone()),
401 pagination: PaginationParams {
402 page: index as i64,
403 page_size: chunk_size as i64,
404 },
405 })
406 .collect::<Vec<_>>();
407
408 let tasks: Vec<_> = chunked_requests
409 .into_iter()
410 .map(|req| {
411 let sem = semaphore.clone();
412 async move {
413 let _permit = sem
414 .acquire()
415 .await
416 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
417 self.get_component_tvl(&req).await
418 }
419 })
420 .collect();
421
422 let responses = try_join_all(tasks).await?;
423
424 let mut merged_tvl = HashMap::new();
425 for resp in responses {
426 for (key, value) in resp.tvl {
427 *merged_tvl.entry(key).or_insert(0.0) = value;
428 }
429 }
430
431 Ok(ComponentTvlRequestResponse {
432 tvl: merged_tvl,
433 pagination: PaginationResponse {
434 page: 0,
435 page_size: chunk_size as i64,
436 total: ids.len() as i64,
437 },
438 })
439 }
440 _ => {
441 let first_request = ComponentTvlRequestBody {
442 chain: request.chain,
443 protocol_system: request.protocol_system.clone(),
444 component_ids: request.component_ids.clone(),
445 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
446 };
447
448 let first_response = self
449 .get_component_tvl(&first_request)
450 .await?;
451 let total_items = first_response.pagination.total;
452 let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
453
454 let mut merged_tvl = first_response.tvl;
455
456 let mut page = 1;
457 while page < total_pages {
458 let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
459
460 let chunked_requests: Vec<_> = (0..requests_in_this_iteration)
461 .map(|i| ComponentTvlRequestBody {
462 chain: request.chain,
463 protocol_system: request.protocol_system.clone(),
464 component_ids: request.component_ids.clone(),
465 pagination: PaginationParams {
466 page: page + i,
467 page_size: chunk_size as i64,
468 },
469 })
470 .collect();
471
472 let tasks: Vec<_> = chunked_requests
473 .into_iter()
474 .map(|req| {
475 let sem = semaphore.clone();
476 async move {
477 let _permit = sem.acquire().await.map_err(|_| {
478 RPCError::Fatal("Semaphore dropped".to_string())
479 })?;
480 self.get_component_tvl(&req).await
481 }
482 })
483 .collect();
484
485 let responses = try_join_all(tasks).await?;
486
487 for resp in responses {
489 for (key, value) in resp.tvl {
490 *merged_tvl.entry(key).or_insert(0.0) += value;
491 }
492 }
493
494 page += concurrency as i64;
495 }
496
497 Ok(ComponentTvlRequestResponse {
498 tvl: merged_tvl,
499 pagination: PaginationResponse {
500 page: 0,
501 page_size: chunk_size as i64,
502 total: total_items,
503 },
504 })
505 }
506 }
507 }
508}
509
510#[derive(Debug, Clone)]
511pub struct HttpRPCClient {
512 http_client: Client,
513 url: Url,
514}
515
516impl HttpRPCClient {
517 pub fn new(base_uri: &str, auth_key: Option<&str>) -> Result<Self, RPCError> {
518 let uri = base_uri
519 .parse::<Url>()
520 .map_err(|e| RPCError::UrlParsing(base_uri.to_string(), e.to_string()))?;
521
522 let mut headers = header::HeaderMap::new();
524 headers.insert(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"));
525 let user_agent = format!("tycho-client-{}", env!("CARGO_PKG_VERSION"));
526 headers.insert(
527 header::USER_AGENT,
528 header::HeaderValue::from_str(&user_agent).expect("invalid user agent format"),
529 );
530
531 if let Some(key) = auth_key {
533 let mut auth_value = header::HeaderValue::from_str(key).expect("invalid key format");
534 auth_value.set_sensitive(true);
535 headers.insert(header::AUTHORIZATION, auth_value);
536 }
537
538 let client = ClientBuilder::new()
539 .default_headers(headers)
540 .http2_prior_knowledge()
541 .build()
542 .map_err(|e| RPCError::HttpClient(e.to_string()))?;
543 Ok(Self { http_client: client, url: uri })
544 }
545}
546
547#[async_trait]
548impl RPCClient for HttpRPCClient {
549 #[instrument(skip(self, request))]
550 async fn get_contract_state(
551 &self,
552 request: &StateRequestBody,
553 ) -> Result<StateRequestResponse, RPCError> {
554 if request.contract_ids.is_none() ||
556 request
557 .contract_ids
558 .as_ref()
559 .unwrap()
560 .is_empty()
561 {
562 warn!("No contract ids specified in request.");
563 }
564
565 let uri = format!(
566 "{}/{}/contract_state",
567 self.url
568 .to_string()
569 .trim_end_matches('/'),
570 TYCHO_SERVER_VERSION
571 );
572 debug!(%uri, "Sending contract_state request to Tycho server");
573 trace!(?request, "Sending request to Tycho server");
574
575 let response = self
576 .http_client
577 .post(&uri)
578 .json(request)
579 .send()
580 .await
581 .map_err(|e| RPCError::HttpClient(e.to_string()))?;
582 trace!(?response, "Received response from Tycho server");
583
584 let body = response
585 .text()
586 .await
587 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
588 if body.is_empty() {
589 return Ok(StateRequestResponse {
591 accounts: vec![],
592 pagination: PaginationResponse {
593 page: request.pagination.page,
594 page_size: request.pagination.page,
595 total: 0,
596 },
597 });
598 }
599
600 let accounts = serde_json::from_str::<StateRequestResponse>(&body)
601 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
602 trace!(?accounts, "Received contract_state response from Tycho server");
603
604 Ok(accounts)
605 }
606
607 async fn get_protocol_components(
608 &self,
609 request: &ProtocolComponentsRequestBody,
610 ) -> Result<ProtocolComponentRequestResponse, RPCError> {
611 let uri = format!(
612 "{}/{}/protocol_components",
613 self.url
614 .to_string()
615 .trim_end_matches('/'),
616 TYCHO_SERVER_VERSION,
617 );
618 debug!(%uri, "Sending protocol_components request to Tycho server");
619 trace!(?request, "Sending request to Tycho server");
620
621 let response = self
622 .http_client
623 .post(uri)
624 .header(header::CONTENT_TYPE, "application/json")
625 .json(request)
626 .send()
627 .await
628 .map_err(|e| RPCError::HttpClient(e.to_string()))?;
629
630 trace!(?response, "Received response from Tycho server");
631
632 let body = response
633 .text()
634 .await
635 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
636 let components = serde_json::from_str::<ProtocolComponentRequestResponse>(&body)
637 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
638 trace!(?components, "Received protocol_components response from Tycho server");
639
640 Ok(components)
641 }
642
643 async fn get_protocol_states(
644 &self,
645 request: &ProtocolStateRequestBody,
646 ) -> Result<ProtocolStateRequestResponse, RPCError> {
647 if request.protocol_ids.is_none() ||
649 request
650 .protocol_ids
651 .as_ref()
652 .unwrap()
653 .is_empty()
654 {
655 warn!("No protocol ids specified in request.");
656 }
657
658 let uri = format!(
659 "{}/{}/protocol_state",
660 self.url
661 .to_string()
662 .trim_end_matches('/'),
663 TYCHO_SERVER_VERSION
664 );
665 debug!(%uri, "Sending protocol_states request to Tycho server");
666 trace!(?request, "Sending request to Tycho server");
667
668 let response = self
669 .http_client
670 .post(&uri)
671 .json(request)
672 .send()
673 .await
674 .map_err(|e| RPCError::HttpClient(e.to_string()))?;
675 trace!(?response, "Received response from Tycho server");
676
677 let body = response
678 .text()
679 .await
680 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
681
682 if body.is_empty() {
683 return Ok(ProtocolStateRequestResponse {
685 states: vec![],
686 pagination: PaginationResponse {
687 page: request.pagination.page,
688 page_size: request.pagination.page_size,
689 total: 0,
690 },
691 });
692 }
693
694 let states = serde_json::from_str::<ProtocolStateRequestResponse>(&body)
695 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
696 trace!(?states, "Received protocol_states response from Tycho server");
697
698 Ok(states)
699 }
700
701 async fn get_tokens(
702 &self,
703 request: &TokensRequestBody,
704 ) -> Result<TokensRequestResponse, RPCError> {
705 let uri = format!(
706 "{}/{}/tokens",
707 self.url
708 .to_string()
709 .trim_end_matches('/'),
710 TYCHO_SERVER_VERSION
711 );
712 debug!(%uri, "Sending tokens request to Tycho server");
713
714 let response = self
715 .http_client
716 .post(&uri)
717 .json(request)
718 .send()
719 .await
720 .map_err(|e| RPCError::HttpClient(e.to_string()))?;
721
722 let body = response
723 .text()
724 .await
725 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
726 let tokens = serde_json::from_str::<TokensRequestResponse>(&body)
727 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
728
729 Ok(tokens)
730 }
731
732 async fn get_protocol_systems(
733 &self,
734 request: &ProtocolSystemsRequestBody,
735 ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
736 let uri = format!(
737 "{}/{}/protocol_systems",
738 self.url
739 .to_string()
740 .trim_end_matches('/'),
741 TYCHO_SERVER_VERSION
742 );
743 debug!(%uri, "Sending protocol_systems request to Tycho server");
744 trace!(?request, "Sending request to Tycho server");
745 let response = self
746 .http_client
747 .post(&uri)
748 .json(request)
749 .send()
750 .await
751 .map_err(|e| RPCError::HttpClient(e.to_string()))?;
752 trace!(?response, "Received response from Tycho server");
753 let body = response
754 .text()
755 .await
756 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
757 let protocol_systems = serde_json::from_str::<ProtocolSystemsRequestResponse>(&body)
758 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
759 trace!(?protocol_systems, "Received protocol_systems response from Tycho server");
760 Ok(protocol_systems)
761 }
762
763 async fn get_component_tvl(
764 &self,
765 request: &ComponentTvlRequestBody,
766 ) -> Result<ComponentTvlRequestResponse, RPCError> {
767 let uri = format!(
768 "{}/{}/component_tvl",
769 self.url
770 .to_string()
771 .trim_end_matches('/'),
772 TYCHO_SERVER_VERSION
773 );
774 debug!(%uri, "Sending get_component_tvl request to Tycho server");
775 trace!(?request, "Sending request to Tycho server");
776 let response = self
777 .http_client
778 .post(&uri)
779 .json(request)
780 .send()
781 .await
782 .map_err(|e| RPCError::HttpClient(e.to_string()))?;
783 trace!(?response, "Received response from Tycho server");
784 let body = response
785 .text()
786 .await
787 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
788 let component_tvl =
789 serde_json::from_str::<ComponentTvlRequestResponse>(&body).map_err(|err| {
790 error!("Failed to parse component_tvl response: {:?}", &body);
791 RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
792 })?;
793 trace!(?component_tvl, "Received component_tvl response from Tycho server");
794 Ok(component_tvl)
795 }
796}
797
798#[cfg(test)]
799mod tests {
800 use std::{collections::HashMap, str::FromStr};
801
802 use mockito::Server;
803 use rstest::rstest;
804 #[allow(deprecated)]
806 use tycho_common::dto::ProtocolId;
807
808 use super::*;
809
810 impl MockRPCClient {
813 #[allow(clippy::too_many_arguments)]
814 async fn test_get_protocol_states_paginated<T>(
815 &self,
816 chain: Chain,
817 ids: &[T],
818 protocol_system: &str,
819 include_balances: bool,
820 version: &VersionParam,
821 chunk_size: usize,
822 _concurrency: usize,
823 ) -> Vec<ProtocolStateRequestBody>
824 where
825 T: AsRef<str> + Clone + Send + Sync + 'static,
826 {
827 ids.chunks(chunk_size)
828 .map(|chunk| ProtocolStateRequestBody {
829 protocol_ids: Some(
830 chunk
831 .iter()
832 .map(|id| id.as_ref().to_string())
833 .collect(),
834 ),
835 protocol_system: protocol_system.to_string(),
836 chain,
837 include_balances,
838 version: version.clone(),
839 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
840 })
841 .collect()
842 }
843 }
844
845 #[allow(deprecated)]
847 #[rstest]
848 #[case::protocol_id_input(vec![
849 ProtocolId { id: "id1".to_string(), chain: Chain::Ethereum },
850 ProtocolId { id: "id2".to_string(), chain: Chain::Ethereum }
851 ])]
852 #[case::string_input(vec![
853 "id1".to_string(),
854 "id2".to_string()
855 ])]
856 #[tokio::test]
857 async fn test_get_protocol_states_paginated_backwards_compatibility<T>(#[case] ids: Vec<T>)
858 where
859 T: AsRef<str> + Clone + Send + Sync + 'static,
860 {
861 let mock_client = MockRPCClient::new();
862
863 let request_bodies = mock_client
864 .test_get_protocol_states_paginated(
865 Chain::Ethereum,
866 &ids,
867 "test_system",
868 true,
869 &VersionParam::default(),
870 2,
871 2,
872 )
873 .await;
874
875 assert_eq!(request_bodies.len(), 1);
877 assert_eq!(
878 request_bodies[0]
879 .protocol_ids
880 .as_ref()
881 .unwrap()
882 .len(),
883 2
884 );
885 }
886
887 #[tokio::test]
888 async fn test_get_contract_state() {
889 let mut server = Server::new_async().await;
890 let server_resp = r#"
891 {
892 "accounts": [
893 {
894 "chain": "ethereum",
895 "address": "0x0000000000000000000000000000000000000000",
896 "title": "",
897 "slots": {},
898 "native_balance": "0x01f4",
899 "token_balances": {},
900 "code": "0x00",
901 "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
902 "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
903 "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
904 "creation_tx": null
905 }
906 ],
907 "pagination": {
908 "page": 0,
909 "page_size": 20,
910 "total": 10
911 }
912 }
913 "#;
914 serde_json::from_str::<StateRequestResponse>(server_resp).expect("deserialize");
916
917 let mocked_server = server
918 .mock("POST", "/v1/contract_state")
919 .expect(1)
920 .with_body(server_resp)
921 .create_async()
922 .await;
923
924 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
925
926 let response = client
927 .get_contract_state(&Default::default())
928 .await
929 .expect("get state");
930 let accounts = response.accounts;
931
932 mocked_server.assert();
933 assert_eq!(accounts.len(), 1);
934 assert_eq!(accounts[0].slots, HashMap::new());
935 assert_eq!(accounts[0].native_balance, Bytes::from(500u16.to_be_bytes()));
936 assert_eq!(accounts[0].code, [0].to_vec());
937 assert_eq!(
938 accounts[0].code_hash,
939 hex::decode("5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e")
940 .unwrap()
941 );
942 }
943
944 #[tokio::test]
945 async fn test_get_protocol_components() {
946 let mut server = Server::new_async().await;
947 let server_resp = r#"
948 {
949 "protocol_components": [
950 {
951 "id": "State1",
952 "protocol_system": "ambient",
953 "protocol_type_name": "Pool",
954 "chain": "ethereum",
955 "tokens": [
956 "0x0000000000000000000000000000000000000000",
957 "0x0000000000000000000000000000000000000001"
958 ],
959 "contract_ids": [
960 "0x0000000000000000000000000000000000000000"
961 ],
962 "static_attributes": {
963 "attribute_1": "0x00000000000003e8"
964 },
965 "change": "Creation",
966 "creation_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
967 "created_at": "2022-01-01T00:00:00"
968 }
969 ],
970 "pagination": {
971 "page": 0,
972 "page_size": 20,
973 "total": 10
974 }
975 }
976 "#;
977 serde_json::from_str::<ProtocolComponentRequestResponse>(server_resp).expect("deserialize");
979
980 let mocked_server = server
981 .mock("POST", "/v1/protocol_components")
982 .expect(1)
983 .with_body(server_resp)
984 .create_async()
985 .await;
986
987 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
988
989 let response = client
990 .get_protocol_components(&Default::default())
991 .await
992 .expect("get state");
993 let components = response.protocol_components;
994
995 mocked_server.assert();
996 assert_eq!(components.len(), 1);
997 assert_eq!(components[0].id, "State1");
998 assert_eq!(components[0].protocol_system, "ambient");
999 assert_eq!(components[0].protocol_type_name, "Pool");
1000 assert_eq!(components[0].tokens.len(), 2);
1001 let expected_attributes =
1002 [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
1003 .iter()
1004 .cloned()
1005 .collect::<HashMap<String, Bytes>>();
1006 assert_eq!(components[0].static_attributes, expected_attributes);
1007 }
1008
1009 #[tokio::test]
1010 async fn test_get_protocol_states() {
1011 let mut server = Server::new_async().await;
1012 let server_resp = r#"
1013 {
1014 "states": [
1015 {
1016 "component_id": "State1",
1017 "attributes": {
1018 "attribute_1": "0x00000000000003e8"
1019 },
1020 "balances": {
1021 "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
1022 }
1023 }
1024 ],
1025 "pagination": {
1026 "page": 0,
1027 "page_size": 20,
1028 "total": 10
1029 }
1030 }
1031 "#;
1032 serde_json::from_str::<ProtocolStateRequestResponse>(server_resp).expect("deserialize");
1034
1035 let mocked_server = server
1036 .mock("POST", "/v1/protocol_state")
1037 .expect(1)
1038 .with_body(server_resp)
1039 .create_async()
1040 .await;
1041 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1042
1043 let response = client
1044 .get_protocol_states(&Default::default())
1045 .await
1046 .expect("get state");
1047 let states = response.states;
1048
1049 mocked_server.assert();
1050 assert_eq!(states.len(), 1);
1051 assert_eq!(states[0].component_id, "State1");
1052 let expected_attributes =
1053 [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
1054 .iter()
1055 .cloned()
1056 .collect::<HashMap<String, Bytes>>();
1057 assert_eq!(states[0].attributes, expected_attributes);
1058 let expected_balances = [(
1059 Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")
1060 .expect("Unsupported address format"),
1061 Bytes::from_str("0x01f4").unwrap(),
1062 )]
1063 .iter()
1064 .cloned()
1065 .collect::<HashMap<Bytes, Bytes>>();
1066 assert_eq!(states[0].balances, expected_balances);
1067 }
1068
1069 #[tokio::test]
1070 async fn test_get_tokens() {
1071 let mut server = Server::new_async().await;
1072 let server_resp = r#"
1073 {
1074 "tokens": [
1075 {
1076 "chain": "ethereum",
1077 "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
1078 "symbol": "WETH",
1079 "decimals": 18,
1080 "tax": 0,
1081 "gas": [
1082 29962
1083 ],
1084 "quality": 100
1085 },
1086 {
1087 "chain": "ethereum",
1088 "address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
1089 "symbol": "USDC",
1090 "decimals": 6,
1091 "tax": 0,
1092 "gas": [
1093 40652
1094 ],
1095 "quality": 100
1096 }
1097 ],
1098 "pagination": {
1099 "page": 0,
1100 "page_size": 20,
1101 "total": 10
1102 }
1103 }
1104 "#;
1105 serde_json::from_str::<TokensRequestResponse>(server_resp).expect("deserialize");
1107
1108 let mocked_server = server
1109 .mock("POST", "/v1/tokens")
1110 .expect(1)
1111 .with_body(server_resp)
1112 .create_async()
1113 .await;
1114 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1115
1116 let response = client
1117 .get_tokens(&Default::default())
1118 .await
1119 .expect("get tokens");
1120
1121 let expected = vec![
1122 ResponseToken {
1123 chain: Chain::Ethereum,
1124 address: Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap(),
1125 symbol: "WETH".to_string(),
1126 decimals: 18,
1127 tax: 0,
1128 gas: vec![Some(29962)],
1129 quality: 100,
1130 },
1131 ResponseToken {
1132 chain: Chain::Ethereum,
1133 address: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
1134 symbol: "USDC".to_string(),
1135 decimals: 6,
1136 tax: 0,
1137 gas: vec![Some(40652)],
1138 quality: 100,
1139 },
1140 ];
1141
1142 mocked_server.assert();
1143 assert_eq!(response.tokens, expected);
1144 assert_eq!(response.pagination, PaginationResponse { page: 0, page_size: 20, total: 10 });
1145 }
1146
1147 #[tokio::test]
1148 async fn test_get_protocol_systems() {
1149 let mut server = Server::new_async().await;
1150 let server_resp = r#"
1151 {
1152 "protocol_systems": [
1153 "system1",
1154 "system2"
1155 ],
1156 "pagination": {
1157 "page": 0,
1158 "page_size": 20,
1159 "total": 10
1160 }
1161 }
1162 "#;
1163 serde_json::from_str::<ProtocolSystemsRequestResponse>(server_resp).expect("deserialize");
1165
1166 let mocked_server = server
1167 .mock("POST", "/v1/protocol_systems")
1168 .expect(1)
1169 .with_body(server_resp)
1170 .create_async()
1171 .await;
1172 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1173
1174 let response = client
1175 .get_protocol_systems(&Default::default())
1176 .await
1177 .expect("get protocol systems");
1178 let protocol_systems = response.protocol_systems;
1179
1180 mocked_server.assert();
1181 assert_eq!(protocol_systems, vec!["system1", "system2"]);
1182 }
1183
1184 #[tokio::test]
1185 async fn test_get_component_tvl() {
1186 let mut server = Server::new_async().await;
1187 let server_resp = r#"
1188 {
1189 "tvl": {
1190 "component1": 100.0
1191 },
1192 "pagination": {
1193 "page": 0,
1194 "page_size": 20,
1195 "total": 10
1196 }
1197 }
1198 "#;
1199 serde_json::from_str::<ComponentTvlRequestResponse>(server_resp).expect("deserialize");
1201
1202 let mocked_server = server
1203 .mock("POST", "/v1/component_tvl")
1204 .expect(1)
1205 .with_body(server_resp)
1206 .create_async()
1207 .await;
1208 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1209
1210 let response = client
1211 .get_component_tvl(&Default::default())
1212 .await
1213 .expect("get protocol systems");
1214 let component_tvl = response.tvl;
1215
1216 mocked_server.assert();
1217 assert_eq!(component_tvl.get("component1"), Some(&100.0));
1218 }
1219}