1use std::{collections::HashMap, sync::Arc};
7
8use async_trait::async_trait;
9use futures03::future::try_join_all;
10#[cfg(test)]
11use mockall::automock;
12use reqwest::{header, Client, ClientBuilder, Url};
13use thiserror::Error;
14use tokio::sync::Semaphore;
15use tracing::{debug, error, instrument, trace, warn};
16use tycho_common::{
17 dto::{
18 Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse, PaginationParams,
19 PaginationResponse, ProtocolComponentRequestResponse, ProtocolComponentsRequestBody,
20 ProtocolStateRequestBody, ProtocolStateRequestResponse, ProtocolSystemsRequestBody,
21 ProtocolSystemsRequestResponse, ResponseToken, StateRequestBody, StateRequestResponse,
22 TokensRequestBody, TokensRequestResponse, VersionParam,
23 },
24 Bytes,
25};
26
27use crate::TYCHO_SERVER_VERSION;
28
29#[derive(Error, Debug)]
30pub enum RPCError {
31 #[error("Failed to parse URL: {0}. Error: {1}")]
33 UrlParsing(String, String),
34
35 #[error("Failed to format request: {0}")]
37 FormatRequest(String),
38
39 #[error("Unexpected HTTP client error: {0}")]
41 HttpClient(String),
42
43 #[error("Failed to parse response: {0}")]
45 ParseResponse(String),
46
47 #[error("Fatal error: {0}")]
49 Fatal(String),
50}
51
52#[cfg_attr(test, automock)]
53#[async_trait]
54pub trait RPCClient: Send + Sync {
55 async fn get_contract_state(
57 &self,
58 request: &StateRequestBody,
59 ) -> Result<StateRequestResponse, RPCError>;
60
61 async fn get_contract_state_paginated(
62 &self,
63 chain: Chain,
64 ids: &[Bytes],
65 protocol_system: &str,
66 version: &VersionParam,
67 chunk_size: usize,
68 concurrency: usize,
69 ) -> Result<StateRequestResponse, RPCError> {
70 let semaphore = Arc::new(Semaphore::new(concurrency));
71
72 let chunked_bodies = ids
73 .chunks(chunk_size)
74 .map(|chunk| StateRequestBody {
75 contract_ids: Some(chunk.to_vec()),
76 protocol_system: protocol_system.to_string(),
77 chain,
78 version: version.clone(),
79 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
80 })
81 .collect::<Vec<_>>();
82
83 let mut tasks = Vec::new();
84 for body in chunked_bodies.iter() {
85 let sem = semaphore.clone();
86 tasks.push(async move {
87 let _permit = sem
88 .acquire()
89 .await
90 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
91 self.get_contract_state(body).await
92 });
93 }
94
95 let responses = try_join_all(tasks).await?;
97
98 let accounts = responses
100 .iter()
101 .flat_map(|r| r.accounts.clone())
102 .collect();
103 let total: i64 = responses
104 .iter()
105 .map(|r| r.pagination.total)
106 .sum();
107
108 Ok(StateRequestResponse {
109 accounts,
110 pagination: PaginationResponse { page: 0, page_size: chunk_size as i64, total },
111 })
112 }
113
114 async fn get_protocol_components(
115 &self,
116 request: &ProtocolComponentsRequestBody,
117 ) -> Result<ProtocolComponentRequestResponse, RPCError>;
118
119 async fn get_protocol_components_paginated(
120 &self,
121 request: &ProtocolComponentsRequestBody,
122 chunk_size: usize,
123 concurrency: usize,
124 ) -> Result<ProtocolComponentRequestResponse, RPCError> {
125 let semaphore = Arc::new(Semaphore::new(concurrency));
126
127 match request.component_ids {
130 Some(ref ids) => {
131 let chunked_bodies = ids
133 .chunks(chunk_size)
134 .enumerate()
135 .map(|(index, _)| ProtocolComponentsRequestBody {
136 protocol_system: request.protocol_system.clone(),
137 component_ids: request.component_ids.clone(),
138 tvl_gt: request.tvl_gt,
139 chain: request.chain,
140 pagination: PaginationParams {
141 page: index as i64,
142 page_size: chunk_size as i64,
143 },
144 })
145 .collect::<Vec<_>>();
146
147 let mut tasks = Vec::new();
148 for body in chunked_bodies.iter() {
149 let sem = semaphore.clone();
150 tasks.push(async move {
151 let _permit = sem
152 .acquire()
153 .await
154 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
155 self.get_protocol_components(body).await
156 });
157 }
158
159 try_join_all(tasks)
160 .await
161 .map(|responses| ProtocolComponentRequestResponse {
162 protocol_components: responses
163 .into_iter()
164 .flat_map(|r| r.protocol_components.into_iter())
165 .collect(),
166 pagination: PaginationResponse {
167 page: 0,
168 page_size: chunk_size as i64,
169 total: ids.len() as i64,
170 },
171 })
172 }
173 _ => {
174 let initial_request = ProtocolComponentsRequestBody {
178 protocol_system: request.protocol_system.clone(),
179 component_ids: request.component_ids.clone(),
180 tvl_gt: request.tvl_gt,
181 chain: request.chain,
182 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
183 };
184 let first_response = self
185 .get_protocol_components(&initial_request)
186 .await
187 .map_err(|err| RPCError::Fatal(err.to_string()))?;
188
189 let total_items = first_response.pagination.total;
190 let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
191
192 let mut accumulated_response = ProtocolComponentRequestResponse {
194 protocol_components: first_response.protocol_components,
195 pagination: PaginationResponse {
196 page: 0,
197 page_size: chunk_size as i64,
198 total: total_items,
199 },
200 };
201
202 let mut page = 1;
203 while page < total_pages {
204 let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
205
206 let chunked_bodies = (0..requests_in_this_iteration)
208 .map(|iter| ProtocolComponentsRequestBody {
209 protocol_system: request.protocol_system.clone(),
210 component_ids: request.component_ids.clone(),
211 tvl_gt: request.tvl_gt,
212 chain: request.chain,
213 pagination: PaginationParams {
214 page: page + iter,
215 page_size: chunk_size as i64,
216 },
217 })
218 .collect::<Vec<_>>();
219
220 let tasks: Vec<_> = chunked_bodies
221 .iter()
222 .map(|body| {
223 let sem = semaphore.clone();
224 async move {
225 let _permit = sem.acquire().await.map_err(|_| {
226 RPCError::Fatal("Semaphore dropped".to_string())
227 })?;
228 self.get_protocol_components(body).await
229 }
230 })
231 .collect();
232
233 let responses = try_join_all(tasks)
234 .await
235 .map(|responses| {
236 let total = responses[0].pagination.total;
237 ProtocolComponentRequestResponse {
238 protocol_components: responses
239 .into_iter()
240 .flat_map(|r| r.protocol_components.into_iter())
241 .collect(),
242 pagination: PaginationResponse {
243 page,
244 page_size: chunk_size as i64,
245 total,
246 },
247 }
248 });
249
250 match responses {
252 Ok(mut resp) => {
253 accumulated_response
254 .protocol_components
255 .append(&mut resp.protocol_components);
256 }
257 Err(e) => return Err(e),
258 }
259
260 page += concurrency as i64;
261 }
262 Ok(accumulated_response)
263 }
264 }
265 }
266
267 async fn get_protocol_states(
268 &self,
269 request: &ProtocolStateRequestBody,
270 ) -> Result<ProtocolStateRequestResponse, RPCError>;
271
272 #[allow(clippy::too_many_arguments)]
273 async fn get_protocol_states_paginated<T>(
274 &self,
275 chain: Chain,
276 ids: &[T],
277 protocol_system: &str,
278 include_balances: bool,
279 version: &VersionParam,
280 chunk_size: usize,
281 concurrency: usize,
282 ) -> Result<ProtocolStateRequestResponse, RPCError>
283 where
284 T: AsRef<str> + Sync + 'static,
285 {
286 let semaphore = Arc::new(Semaphore::new(concurrency));
287 let chunked_bodies = ids
288 .chunks(chunk_size)
289 .map(|c| ProtocolStateRequestBody {
290 protocol_ids: Some(
291 c.iter()
292 .map(|id| id.as_ref().to_string())
293 .collect(),
294 ),
295 protocol_system: protocol_system.to_string(),
296 chain,
297 include_balances,
298 version: version.clone(),
299 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
300 })
301 .collect::<Vec<_>>();
302
303 let mut tasks = Vec::new();
304 for body in chunked_bodies.iter() {
305 let sem = semaphore.clone();
306 tasks.push(async move {
307 let _permit = sem
308 .acquire()
309 .await
310 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
311 self.get_protocol_states(body).await
312 });
313 }
314
315 try_join_all(tasks)
316 .await
317 .map(|responses| {
318 let states = responses
319 .clone()
320 .into_iter()
321 .flat_map(|r| r.states)
322 .collect();
323 let total = responses
324 .iter()
325 .map(|r| r.pagination.total)
326 .sum();
327 ProtocolStateRequestResponse {
328 states,
329 pagination: PaginationResponse { page: 0, page_size: chunk_size as i64, total },
330 }
331 })
332 }
333
334 async fn get_tokens(
337 &self,
338 request: &TokensRequestBody,
339 ) -> Result<TokensRequestResponse, RPCError>;
340
341 async fn get_all_tokens(
342 &self,
343 chain: Chain,
344 min_quality: Option<i32>,
345 traded_n_days_ago: Option<u64>,
346 chunk_size: usize,
347 ) -> Result<Vec<ResponseToken>, RPCError> {
348 let mut request_page = 0;
349 let mut all_tokens = Vec::new();
350 loop {
351 let mut response = self
352 .get_tokens(&TokensRequestBody {
353 token_addresses: None,
354 min_quality,
355 traded_n_days_ago,
356 pagination: PaginationParams {
357 page: request_page,
358 page_size: chunk_size.try_into().map_err(|_| {
359 RPCError::FormatRequest(
360 "Failed to convert chunk_size into i64".to_string(),
361 )
362 })?,
363 },
364 chain,
365 })
366 .await?;
367
368 let num_tokens = response.tokens.len();
369 all_tokens.append(&mut response.tokens);
370 request_page += 1;
371
372 if num_tokens < chunk_size {
373 break;
374 }
375 }
376 Ok(all_tokens)
377 }
378
379 async fn get_protocol_systems(
380 &self,
381 request: &ProtocolSystemsRequestBody,
382 ) -> Result<ProtocolSystemsRequestResponse, RPCError>;
383
384 async fn get_component_tvl(
385 &self,
386 request: &ComponentTvlRequestBody,
387 ) -> Result<ComponentTvlRequestResponse, RPCError>;
388
389 async fn get_component_tvl_paginated(
390 &self,
391 request: &ComponentTvlRequestBody,
392 chunk_size: usize,
393 concurrency: usize,
394 ) -> Result<ComponentTvlRequestResponse, RPCError> {
395 let semaphore = Arc::new(Semaphore::new(concurrency));
396
397 match request.component_ids {
398 Some(ref ids) => {
399 let chunked_requests = ids
400 .chunks(chunk_size)
401 .enumerate()
402 .map(|(index, _)| ComponentTvlRequestBody {
403 chain: request.chain,
404 protocol_system: request.protocol_system.clone(),
405 component_ids: Some(ids.clone()),
406 pagination: PaginationParams {
407 page: index as i64,
408 page_size: chunk_size as i64,
409 },
410 })
411 .collect::<Vec<_>>();
412
413 let tasks: Vec<_> = chunked_requests
414 .into_iter()
415 .map(|req| {
416 let sem = semaphore.clone();
417 async move {
418 let _permit = sem
419 .acquire()
420 .await
421 .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
422 self.get_component_tvl(&req).await
423 }
424 })
425 .collect();
426
427 let responses = try_join_all(tasks).await?;
428
429 let mut merged_tvl = HashMap::new();
430 for resp in responses {
431 for (key, value) in resp.tvl {
432 *merged_tvl.entry(key).or_insert(0.0) = value;
433 }
434 }
435
436 Ok(ComponentTvlRequestResponse {
437 tvl: merged_tvl,
438 pagination: PaginationResponse {
439 page: 0,
440 page_size: chunk_size as i64,
441 total: ids.len() as i64,
442 },
443 })
444 }
445 _ => {
446 let first_request = ComponentTvlRequestBody {
447 chain: request.chain,
448 protocol_system: request.protocol_system.clone(),
449 component_ids: request.component_ids.clone(),
450 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
451 };
452
453 let first_response = self
454 .get_component_tvl(&first_request)
455 .await?;
456 let total_items = first_response.pagination.total;
457 let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
458
459 let mut merged_tvl = first_response.tvl;
460
461 let mut page = 1;
462 while page < total_pages {
463 let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
464
465 let chunked_requests: Vec<_> = (0..requests_in_this_iteration)
466 .map(|i| ComponentTvlRequestBody {
467 chain: request.chain,
468 protocol_system: request.protocol_system.clone(),
469 component_ids: request.component_ids.clone(),
470 pagination: PaginationParams {
471 page: page + i,
472 page_size: chunk_size as i64,
473 },
474 })
475 .collect();
476
477 let tasks: Vec<_> = chunked_requests
478 .into_iter()
479 .map(|req| {
480 let sem = semaphore.clone();
481 async move {
482 let _permit = sem.acquire().await.map_err(|_| {
483 RPCError::Fatal("Semaphore dropped".to_string())
484 })?;
485 self.get_component_tvl(&req).await
486 }
487 })
488 .collect();
489
490 let responses = try_join_all(tasks).await?;
491
492 for resp in responses {
494 for (key, value) in resp.tvl {
495 *merged_tvl.entry(key).or_insert(0.0) += value;
496 }
497 }
498
499 page += concurrency as i64;
500 }
501
502 Ok(ComponentTvlRequestResponse {
503 tvl: merged_tvl,
504 pagination: PaginationResponse {
505 page: 0,
506 page_size: chunk_size as i64,
507 total: total_items,
508 },
509 })
510 }
511 }
512 }
513}
514
515#[derive(Debug, Clone)]
516pub struct HttpRPCClient {
517 http_client: Client,
518 url: Url,
519}
520
521impl HttpRPCClient {
522 pub fn new(base_uri: &str, auth_key: Option<&str>) -> Result<Self, RPCError> {
523 let uri = base_uri
524 .parse::<Url>()
525 .map_err(|e| RPCError::UrlParsing(base_uri.to_string(), e.to_string()))?;
526
527 let mut headers = header::HeaderMap::new();
529 headers.insert(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"));
530 let user_agent = format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION"));
531 headers.insert(
532 header::USER_AGENT,
533 header::HeaderValue::from_str(&user_agent)
534 .map_err(|e| RPCError::FormatRequest(format!("Invalid user agent format: {e}")))?,
535 );
536
537 if let Some(key) = auth_key {
539 let mut auth_value = header::HeaderValue::from_str(key).map_err(|e| {
540 RPCError::FormatRequest(format!("Invalid authorization key format: {e}"))
541 })?;
542 auth_value.set_sensitive(true);
543 headers.insert(header::AUTHORIZATION, auth_value);
544 }
545
546 let client = ClientBuilder::new()
547 .default_headers(headers)
548 .http2_prior_knowledge()
549 .build()
550 .map_err(|e| RPCError::HttpClient(e.to_string()))?;
551 Ok(Self { http_client: client, url: uri })
552 }
553}
554
555#[async_trait]
556impl RPCClient for HttpRPCClient {
557 #[instrument(skip(self, request))]
558 async fn get_contract_state(
559 &self,
560 request: &StateRequestBody,
561 ) -> Result<StateRequestResponse, RPCError> {
562 if request
564 .contract_ids
565 .as_ref()
566 .is_none_or(|ids| ids.is_empty())
567 {
568 warn!("No contract ids specified in request.");
569 }
570
571 let uri = format!(
572 "{}/{}/contract_state",
573 self.url
574 .to_string()
575 .trim_end_matches('/'),
576 TYCHO_SERVER_VERSION
577 );
578 debug!(%uri, "Sending contract_state request to Tycho server");
579 trace!(?request, "Sending request to Tycho server");
580
581 let response = self
582 .http_client
583 .post(&uri)
584 .json(request)
585 .send()
586 .await
587 .map_err(|e| RPCError::HttpClient(e.to_string()))?;
588 trace!(?response, "Received response from Tycho server");
589
590 let body = response
591 .text()
592 .await
593 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
594 if body.is_empty() {
595 return Ok(StateRequestResponse {
597 accounts: vec![],
598 pagination: PaginationResponse {
599 page: request.pagination.page,
600 page_size: request.pagination.page,
601 total: 0,
602 },
603 });
604 }
605
606 let accounts = serde_json::from_str::<StateRequestResponse>(&body)
607 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
608 trace!(?accounts, "Received contract_state response from Tycho server");
609
610 Ok(accounts)
611 }
612
613 async fn get_protocol_components(
614 &self,
615 request: &ProtocolComponentsRequestBody,
616 ) -> Result<ProtocolComponentRequestResponse, RPCError> {
617 let uri = format!(
618 "{}/{}/protocol_components",
619 self.url
620 .to_string()
621 .trim_end_matches('/'),
622 TYCHO_SERVER_VERSION,
623 );
624 debug!(%uri, "Sending protocol_components request to Tycho server");
625 trace!(?request, "Sending request to Tycho server");
626
627 let response = self
628 .http_client
629 .post(uri)
630 .header(header::CONTENT_TYPE, "application/json")
631 .json(request)
632 .send()
633 .await
634 .map_err(|e| RPCError::HttpClient(e.to_string()))?;
635
636 trace!(?response, "Received response from Tycho server");
637
638 let body = response
639 .text()
640 .await
641 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
642 let components = serde_json::from_str::<ProtocolComponentRequestResponse>(&body)
643 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
644 trace!(?components, "Received protocol_components response from Tycho server");
645
646 Ok(components)
647 }
648
649 async fn get_protocol_states(
650 &self,
651 request: &ProtocolStateRequestBody,
652 ) -> Result<ProtocolStateRequestResponse, RPCError> {
653 if request
655 .protocol_ids
656 .as_ref()
657 .is_none_or(|ids| ids.is_empty())
658 {
659 warn!("No protocol ids specified in request.");
660 }
661
662 let uri = format!(
663 "{}/{}/protocol_state",
664 self.url
665 .to_string()
666 .trim_end_matches('/'),
667 TYCHO_SERVER_VERSION
668 );
669 debug!(%uri, "Sending protocol_states request to Tycho server");
670 trace!(?request, "Sending request to Tycho server");
671
672 let response = self
673 .http_client
674 .post(&uri)
675 .json(request)
676 .send()
677 .await
678 .map_err(|e| RPCError::HttpClient(e.to_string()))?;
679 trace!(?response, "Received response from Tycho server");
680
681 let body = response
682 .text()
683 .await
684 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
685
686 if body.is_empty() {
687 return Ok(ProtocolStateRequestResponse {
689 states: vec![],
690 pagination: PaginationResponse {
691 page: request.pagination.page,
692 page_size: request.pagination.page_size,
693 total: 0,
694 },
695 });
696 }
697
698 let states = serde_json::from_str::<ProtocolStateRequestResponse>(&body)
699 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
700 trace!(?states, "Received protocol_states response from Tycho server");
701
702 Ok(states)
703 }
704
705 async fn get_tokens(
706 &self,
707 request: &TokensRequestBody,
708 ) -> Result<TokensRequestResponse, RPCError> {
709 let uri = format!(
710 "{}/{}/tokens",
711 self.url
712 .to_string()
713 .trim_end_matches('/'),
714 TYCHO_SERVER_VERSION
715 );
716 debug!(%uri, "Sending tokens request to Tycho server");
717
718 let response = self
719 .http_client
720 .post(&uri)
721 .json(request)
722 .send()
723 .await
724 .map_err(|e| RPCError::HttpClient(e.to_string()))?;
725
726 let body = response
727 .text()
728 .await
729 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
730 let tokens = serde_json::from_str::<TokensRequestResponse>(&body)
731 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
732
733 Ok(tokens)
734 }
735
736 async fn get_protocol_systems(
737 &self,
738 request: &ProtocolSystemsRequestBody,
739 ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
740 let uri = format!(
741 "{}/{}/protocol_systems",
742 self.url
743 .to_string()
744 .trim_end_matches('/'),
745 TYCHO_SERVER_VERSION
746 );
747 debug!(%uri, "Sending protocol_systems request to Tycho server");
748 trace!(?request, "Sending request to Tycho server");
749 let response = self
750 .http_client
751 .post(&uri)
752 .json(request)
753 .send()
754 .await
755 .map_err(|e| RPCError::HttpClient(e.to_string()))?;
756 trace!(?response, "Received response from Tycho server");
757 let body = response
758 .text()
759 .await
760 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
761 let protocol_systems = serde_json::from_str::<ProtocolSystemsRequestResponse>(&body)
762 .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
763 trace!(?protocol_systems, "Received protocol_systems response from Tycho server");
764 Ok(protocol_systems)
765 }
766
767 async fn get_component_tvl(
768 &self,
769 request: &ComponentTvlRequestBody,
770 ) -> Result<ComponentTvlRequestResponse, RPCError> {
771 let uri = format!(
772 "{}/{}/component_tvl",
773 self.url
774 .to_string()
775 .trim_end_matches('/'),
776 TYCHO_SERVER_VERSION
777 );
778 debug!(%uri, "Sending get_component_tvl request to Tycho server");
779 trace!(?request, "Sending request to Tycho server");
780 let response = self
781 .http_client
782 .post(&uri)
783 .json(request)
784 .send()
785 .await
786 .map_err(|e| RPCError::HttpClient(e.to_string()))?;
787 trace!(?response, "Received response from Tycho server");
788 let body = response
789 .text()
790 .await
791 .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
792 let component_tvl =
793 serde_json::from_str::<ComponentTvlRequestResponse>(&body).map_err(|err| {
794 error!("Failed to parse component_tvl response: {:?}", &body);
795 RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
796 })?;
797 trace!(?component_tvl, "Received component_tvl response from Tycho server");
798 Ok(component_tvl)
799 }
800}
801
802#[cfg(test)]
803mod tests {
804 use std::{collections::HashMap, str::FromStr};
805
806 use mockito::Server;
807 use rstest::rstest;
808 #[allow(deprecated)]
810 use tycho_common::dto::ProtocolId;
811
812 use super::*;
813
814 impl MockRPCClient {
817 #[allow(clippy::too_many_arguments)]
818 async fn test_get_protocol_states_paginated<T>(
819 &self,
820 chain: Chain,
821 ids: &[T],
822 protocol_system: &str,
823 include_balances: bool,
824 version: &VersionParam,
825 chunk_size: usize,
826 _concurrency: usize,
827 ) -> Vec<ProtocolStateRequestBody>
828 where
829 T: AsRef<str> + Clone + Send + Sync + 'static,
830 {
831 ids.chunks(chunk_size)
832 .map(|chunk| ProtocolStateRequestBody {
833 protocol_ids: Some(
834 chunk
835 .iter()
836 .map(|id| id.as_ref().to_string())
837 .collect(),
838 ),
839 protocol_system: protocol_system.to_string(),
840 chain,
841 include_balances,
842 version: version.clone(),
843 pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
844 })
845 .collect()
846 }
847 }
848
849 #[allow(deprecated)]
851 #[rstest]
852 #[case::protocol_id_input(vec![
853 ProtocolId { id: "id1".to_string(), chain: Chain::Ethereum },
854 ProtocolId { id: "id2".to_string(), chain: Chain::Ethereum }
855 ])]
856 #[case::string_input(vec![
857 "id1".to_string(),
858 "id2".to_string()
859 ])]
860 #[tokio::test]
861 async fn test_get_protocol_states_paginated_backwards_compatibility<T>(#[case] ids: Vec<T>)
862 where
863 T: AsRef<str> + Clone + Send + Sync + 'static,
864 {
865 let mock_client = MockRPCClient::new();
866
867 let request_bodies = mock_client
868 .test_get_protocol_states_paginated(
869 Chain::Ethereum,
870 &ids,
871 "test_system",
872 true,
873 &VersionParam::default(),
874 2,
875 2,
876 )
877 .await;
878
879 assert_eq!(request_bodies.len(), 1);
881 assert_eq!(
882 request_bodies[0]
883 .protocol_ids
884 .as_ref()
885 .unwrap()
886 .len(),
887 2
888 );
889 }
890
891 #[tokio::test]
892 async fn test_get_contract_state() {
893 let mut server = Server::new_async().await;
894 let server_resp = r#"
895 {
896 "accounts": [
897 {
898 "chain": "ethereum",
899 "address": "0x0000000000000000000000000000000000000000",
900 "title": "",
901 "slots": {},
902 "native_balance": "0x01f4",
903 "token_balances": {},
904 "code": "0x00",
905 "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
906 "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
907 "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
908 "creation_tx": null
909 }
910 ],
911 "pagination": {
912 "page": 0,
913 "page_size": 20,
914 "total": 10
915 }
916 }
917 "#;
918 serde_json::from_str::<StateRequestResponse>(server_resp).expect("deserialize");
920
921 let mocked_server = server
922 .mock("POST", "/v1/contract_state")
923 .expect(1)
924 .with_body(server_resp)
925 .create_async()
926 .await;
927
928 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
929
930 let response = client
931 .get_contract_state(&Default::default())
932 .await
933 .expect("get state");
934 let accounts = response.accounts;
935
936 mocked_server.assert();
937 assert_eq!(accounts.len(), 1);
938 assert_eq!(accounts[0].slots, HashMap::new());
939 assert_eq!(accounts[0].native_balance, Bytes::from(500u16.to_be_bytes()));
940 assert_eq!(accounts[0].code, [0].to_vec());
941 assert_eq!(
942 accounts[0].code_hash,
943 hex::decode("5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e")
944 .unwrap()
945 );
946 }
947
948 #[tokio::test]
949 async fn test_get_protocol_components() {
950 let mut server = Server::new_async().await;
951 let server_resp = r#"
952 {
953 "protocol_components": [
954 {
955 "id": "State1",
956 "protocol_system": "ambient",
957 "protocol_type_name": "Pool",
958 "chain": "ethereum",
959 "tokens": [
960 "0x0000000000000000000000000000000000000000",
961 "0x0000000000000000000000000000000000000001"
962 ],
963 "contract_ids": [
964 "0x0000000000000000000000000000000000000000"
965 ],
966 "static_attributes": {
967 "attribute_1": "0x00000000000003e8"
968 },
969 "change": "Creation",
970 "creation_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
971 "created_at": "2022-01-01T00:00:00"
972 }
973 ],
974 "pagination": {
975 "page": 0,
976 "page_size": 20,
977 "total": 10
978 }
979 }
980 "#;
981 serde_json::from_str::<ProtocolComponentRequestResponse>(server_resp).expect("deserialize");
983
984 let mocked_server = server
985 .mock("POST", "/v1/protocol_components")
986 .expect(1)
987 .with_body(server_resp)
988 .create_async()
989 .await;
990
991 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
992
993 let response = client
994 .get_protocol_components(&Default::default())
995 .await
996 .expect("get state");
997 let components = response.protocol_components;
998
999 mocked_server.assert();
1000 assert_eq!(components.len(), 1);
1001 assert_eq!(components[0].id, "State1");
1002 assert_eq!(components[0].protocol_system, "ambient");
1003 assert_eq!(components[0].protocol_type_name, "Pool");
1004 assert_eq!(components[0].tokens.len(), 2);
1005 let expected_attributes =
1006 [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
1007 .iter()
1008 .cloned()
1009 .collect::<HashMap<String, Bytes>>();
1010 assert_eq!(components[0].static_attributes, expected_attributes);
1011 }
1012
1013 #[tokio::test]
1014 async fn test_get_protocol_states() {
1015 let mut server = Server::new_async().await;
1016 let server_resp = r#"
1017 {
1018 "states": [
1019 {
1020 "component_id": "State1",
1021 "attributes": {
1022 "attribute_1": "0x00000000000003e8"
1023 },
1024 "balances": {
1025 "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
1026 }
1027 }
1028 ],
1029 "pagination": {
1030 "page": 0,
1031 "page_size": 20,
1032 "total": 10
1033 }
1034 }
1035 "#;
1036 serde_json::from_str::<ProtocolStateRequestResponse>(server_resp).expect("deserialize");
1038
1039 let mocked_server = server
1040 .mock("POST", "/v1/protocol_state")
1041 .expect(1)
1042 .with_body(server_resp)
1043 .create_async()
1044 .await;
1045 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1046
1047 let response = client
1048 .get_protocol_states(&Default::default())
1049 .await
1050 .expect("get state");
1051 let states = response.states;
1052
1053 mocked_server.assert();
1054 assert_eq!(states.len(), 1);
1055 assert_eq!(states[0].component_id, "State1");
1056 let expected_attributes =
1057 [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
1058 .iter()
1059 .cloned()
1060 .collect::<HashMap<String, Bytes>>();
1061 assert_eq!(states[0].attributes, expected_attributes);
1062 let expected_balances = [(
1063 Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")
1064 .expect("Unsupported address format"),
1065 Bytes::from_str("0x01f4").unwrap(),
1066 )]
1067 .iter()
1068 .cloned()
1069 .collect::<HashMap<Bytes, Bytes>>();
1070 assert_eq!(states[0].balances, expected_balances);
1071 }
1072
1073 #[tokio::test]
1074 async fn test_get_tokens() {
1075 let mut server = Server::new_async().await;
1076 let server_resp = r#"
1077 {
1078 "tokens": [
1079 {
1080 "chain": "ethereum",
1081 "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
1082 "symbol": "WETH",
1083 "decimals": 18,
1084 "tax": 0,
1085 "gas": [
1086 29962
1087 ],
1088 "quality": 100
1089 },
1090 {
1091 "chain": "ethereum",
1092 "address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
1093 "symbol": "USDC",
1094 "decimals": 6,
1095 "tax": 0,
1096 "gas": [
1097 40652
1098 ],
1099 "quality": 100
1100 }
1101 ],
1102 "pagination": {
1103 "page": 0,
1104 "page_size": 20,
1105 "total": 10
1106 }
1107 }
1108 "#;
1109 serde_json::from_str::<TokensRequestResponse>(server_resp).expect("deserialize");
1111
1112 let mocked_server = server
1113 .mock("POST", "/v1/tokens")
1114 .expect(1)
1115 .with_body(server_resp)
1116 .create_async()
1117 .await;
1118 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1119
1120 let response = client
1121 .get_tokens(&Default::default())
1122 .await
1123 .expect("get tokens");
1124
1125 let expected = vec![
1126 ResponseToken {
1127 chain: Chain::Ethereum,
1128 address: Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap(),
1129 symbol: "WETH".to_string(),
1130 decimals: 18,
1131 tax: 0,
1132 gas: vec![Some(29962)],
1133 quality: 100,
1134 },
1135 ResponseToken {
1136 chain: Chain::Ethereum,
1137 address: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
1138 symbol: "USDC".to_string(),
1139 decimals: 6,
1140 tax: 0,
1141 gas: vec![Some(40652)],
1142 quality: 100,
1143 },
1144 ];
1145
1146 mocked_server.assert();
1147 assert_eq!(response.tokens, expected);
1148 assert_eq!(response.pagination, PaginationResponse { page: 0, page_size: 20, total: 10 });
1149 }
1150
1151 #[tokio::test]
1152 async fn test_get_protocol_systems() {
1153 let mut server = Server::new_async().await;
1154 let server_resp = r#"
1155 {
1156 "protocol_systems": [
1157 "system1",
1158 "system2"
1159 ],
1160 "pagination": {
1161 "page": 0,
1162 "page_size": 20,
1163 "total": 10
1164 }
1165 }
1166 "#;
1167 serde_json::from_str::<ProtocolSystemsRequestResponse>(server_resp).expect("deserialize");
1169
1170 let mocked_server = server
1171 .mock("POST", "/v1/protocol_systems")
1172 .expect(1)
1173 .with_body(server_resp)
1174 .create_async()
1175 .await;
1176 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1177
1178 let response = client
1179 .get_protocol_systems(&Default::default())
1180 .await
1181 .expect("get protocol systems");
1182 let protocol_systems = response.protocol_systems;
1183
1184 mocked_server.assert();
1185 assert_eq!(protocol_systems, vec!["system1", "system2"]);
1186 }
1187
1188 #[tokio::test]
1189 async fn test_get_component_tvl() {
1190 let mut server = Server::new_async().await;
1191 let server_resp = r#"
1192 {
1193 "tvl": {
1194 "component1": 100.0
1195 },
1196 "pagination": {
1197 "page": 0,
1198 "page_size": 20,
1199 "total": 10
1200 }
1201 }
1202 "#;
1203 serde_json::from_str::<ComponentTvlRequestResponse>(server_resp).expect("deserialize");
1205
1206 let mocked_server = server
1207 .mock("POST", "/v1/component_tvl")
1208 .expect(1)
1209 .with_body(server_resp)
1210 .create_async()
1211 .await;
1212 let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1213
1214 let response = client
1215 .get_component_tvl(&Default::default())
1216 .await
1217 .expect("get protocol systems");
1218 let component_tvl = response.tvl;
1219
1220 mocked_server.assert();
1221 assert_eq!(component_tvl.get("component1"), Some(&100.0));
1222 }
1223}