tycho_client/
rpc.rs

1//! # Tycho RPC Client
2//!
3//! The objective of this module is to provide swift and simplified access to the Remote Procedure
4//! Call (RPC) endpoints of Tycho. These endpoints are chiefly responsible for facilitating data
5//! queries, especially querying snapshots of data.
6use std::{collections::HashMap, sync::Arc};
7
8use async_trait::async_trait;
9use futures03::future::try_join_all;
10#[cfg(test)]
11use mockall::automock;
12use reqwest::{header, Client, ClientBuilder, Url};
13use thiserror::Error;
14use tokio::sync::Semaphore;
15use tracing::{debug, error, instrument, trace, warn};
16use tycho_common::{
17    dto::{
18        Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse, PaginationParams,
19        PaginationResponse, ProtocolComponentRequestResponse, ProtocolComponentsRequestBody,
20        ProtocolStateRequestBody, ProtocolStateRequestResponse, ProtocolSystemsRequestBody,
21        ProtocolSystemsRequestResponse, ResponseToken, StateRequestBody, StateRequestResponse,
22        TokensRequestBody, TokensRequestResponse, VersionParam,
23    },
24    Bytes,
25};
26
27use crate::TYCHO_SERVER_VERSION;
28
29#[derive(Error, Debug)]
30pub enum RPCError {
31    /// The passed tycho url failed to parse.
32    #[error("Failed to parse URL: {0}. Error: {1}")]
33    UrlParsing(String, String),
34    /// The request data is not correctly formed.
35    #[error("Failed to format request: {0}")]
36    FormatRequest(String),
37    /// Errors forwarded from the HTTP protocol.
38    #[error("Unexpected HTTP client error: {0}")]
39    HttpClient(String),
40    /// The response from the server could not be parsed correctly.
41    #[error("Failed to parse response: {0}")]
42    ParseResponse(String),
43    #[error("Fatal error: {0}")]
44    Fatal(String),
45}
46
47#[cfg_attr(test, automock)]
48#[async_trait]
49pub trait RPCClient: Send + Sync {
50    /// Retrieves a snapshot of contract state.
51    async fn get_contract_state(
52        &self,
53        request: &StateRequestBody,
54    ) -> Result<StateRequestResponse, RPCError>;
55
56    async fn get_contract_state_paginated(
57        &self,
58        chain: Chain,
59        ids: &[Bytes],
60        protocol_system: &str,
61        version: &VersionParam,
62        chunk_size: usize,
63        concurrency: usize,
64    ) -> Result<StateRequestResponse, RPCError> {
65        let semaphore = Arc::new(Semaphore::new(concurrency));
66
67        let chunked_bodies = ids
68            .chunks(chunk_size)
69            .map(|chunk| StateRequestBody {
70                contract_ids: Some(chunk.to_vec()),
71                protocol_system: protocol_system.to_string(),
72                chain,
73                version: version.clone(),
74                pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
75            })
76            .collect::<Vec<_>>();
77
78        let mut tasks = Vec::new();
79        for body in chunked_bodies.iter() {
80            let sem = semaphore.clone();
81            tasks.push(async move {
82                let _permit = sem
83                    .acquire()
84                    .await
85                    .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
86                self.get_contract_state(body).await
87            });
88        }
89
90        // Execute all tasks concurrently with the defined concurrency limit.
91        let responses = try_join_all(tasks).await?;
92
93        // Aggregate the responses into a single result.
94        let accounts = responses
95            .iter()
96            .flat_map(|r| r.accounts.clone())
97            .collect();
98        let total: i64 = responses
99            .iter()
100            .map(|r| r.pagination.total)
101            .sum();
102
103        Ok(StateRequestResponse {
104            accounts,
105            pagination: PaginationResponse { page: 0, page_size: chunk_size as i64, total },
106        })
107    }
108
109    async fn get_protocol_components(
110        &self,
111        request: &ProtocolComponentsRequestBody,
112    ) -> Result<ProtocolComponentRequestResponse, RPCError>;
113
114    async fn get_protocol_components_paginated(
115        &self,
116        request: &ProtocolComponentsRequestBody,
117        chunk_size: usize,
118        concurrency: usize,
119    ) -> Result<ProtocolComponentRequestResponse, RPCError> {
120        let semaphore = Arc::new(Semaphore::new(concurrency));
121
122        // If a set of component IDs is specified, the maximum return size is already known,
123        // allowing us to pre-compute the number of requests to be made.
124        match request.component_ids {
125            Some(ref ids) => {
126                // We can divide the component_ids into chunks of size chunk_size
127                let chunked_bodies = ids
128                    .chunks(chunk_size)
129                    .enumerate()
130                    .map(|(index, _)| ProtocolComponentsRequestBody {
131                        protocol_system: request.protocol_system.clone(),
132                        component_ids: request.component_ids.clone(),
133                        tvl_gt: request.tvl_gt,
134                        chain: request.chain,
135                        pagination: PaginationParams {
136                            page: index as i64,
137                            page_size: chunk_size as i64,
138                        },
139                    })
140                    .collect::<Vec<_>>();
141
142                let mut tasks = Vec::new();
143                for body in chunked_bodies.iter() {
144                    let sem = semaphore.clone();
145                    tasks.push(async move {
146                        let _permit = sem
147                            .acquire()
148                            .await
149                            .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
150                        self.get_protocol_components(body).await
151                    });
152                }
153
154                try_join_all(tasks)
155                    .await
156                    .map(|responses| ProtocolComponentRequestResponse {
157                        protocol_components: responses
158                            .into_iter()
159                            .flat_map(|r| r.protocol_components.into_iter())
160                            .collect(),
161                        pagination: PaginationResponse {
162                            page: 0,
163                            page_size: chunk_size as i64,
164                            total: ids.len() as i64,
165                        },
166                    })
167            }
168            _ => {
169                // If no component ids are specified, we need to make requests based on the total
170                // number of results from the first response.
171
172                let initial_request = ProtocolComponentsRequestBody {
173                    protocol_system: request.protocol_system.clone(),
174                    component_ids: request.component_ids.clone(),
175                    tvl_gt: request.tvl_gt,
176                    chain: request.chain,
177                    pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
178                };
179                let first_response = self
180                    .get_protocol_components(&initial_request)
181                    .await
182                    .map_err(|err| RPCError::Fatal(err.to_string()))?;
183
184                let total_items = first_response.pagination.total;
185                let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
186
187                // Initialize the final response accumulator
188                let mut accumulated_response = ProtocolComponentRequestResponse {
189                    protocol_components: first_response.protocol_components,
190                    pagination: PaginationResponse {
191                        page: 0,
192                        page_size: chunk_size as i64,
193                        total: total_items,
194                    },
195                };
196
197                let mut page = 1;
198                while page < total_pages {
199                    let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
200
201                    // Create request bodies for parallel requests, respecting the concurrency limit
202                    let chunked_bodies = (0..requests_in_this_iteration)
203                        .map(|iter| ProtocolComponentsRequestBody {
204                            protocol_system: request.protocol_system.clone(),
205                            component_ids: request.component_ids.clone(),
206                            tvl_gt: request.tvl_gt,
207                            chain: request.chain,
208                            pagination: PaginationParams {
209                                page: page + iter,
210                                page_size: chunk_size as i64,
211                            },
212                        })
213                        .collect::<Vec<_>>();
214
215                    let tasks: Vec<_> = chunked_bodies
216                        .iter()
217                        .map(|body| {
218                            let sem = semaphore.clone();
219                            async move {
220                                let _permit = sem.acquire().await.map_err(|_| {
221                                    RPCError::Fatal("Semaphore dropped".to_string())
222                                })?;
223                                self.get_protocol_components(body).await
224                            }
225                        })
226                        .collect();
227
228                    let responses = try_join_all(tasks)
229                        .await
230                        .map(|responses| {
231                            let total = responses[0].pagination.total;
232                            ProtocolComponentRequestResponse {
233                                protocol_components: responses
234                                    .into_iter()
235                                    .flat_map(|r| r.protocol_components.into_iter())
236                                    .collect(),
237                                pagination: PaginationResponse {
238                                    page,
239                                    page_size: chunk_size as i64,
240                                    total,
241                                },
242                            }
243                        });
244
245                    // Update the accumulated response or set the initial response
246                    match responses {
247                        Ok(mut resp) => {
248                            accumulated_response
249                                .protocol_components
250                                .append(&mut resp.protocol_components);
251                        }
252                        Err(e) => return Err(e),
253                    }
254
255                    page += concurrency as i64;
256                }
257                Ok(accumulated_response)
258            }
259        }
260    }
261
262    async fn get_protocol_states(
263        &self,
264        request: &ProtocolStateRequestBody,
265    ) -> Result<ProtocolStateRequestResponse, RPCError>;
266
267    #[allow(clippy::too_many_arguments)]
268    async fn get_protocol_states_paginated<T>(
269        &self,
270        chain: Chain,
271        ids: &[T],
272        protocol_system: &str,
273        include_balances: bool,
274        version: &VersionParam,
275        chunk_size: usize,
276        concurrency: usize,
277    ) -> Result<ProtocolStateRequestResponse, RPCError>
278    where
279        T: AsRef<str> + Sync + 'static,
280    {
281        let semaphore = Arc::new(Semaphore::new(concurrency));
282        let chunked_bodies = ids
283            .chunks(chunk_size)
284            .map(|c| ProtocolStateRequestBody {
285                protocol_ids: Some(
286                    c.iter()
287                        .map(|id| id.as_ref().to_string())
288                        .collect(),
289                ),
290                protocol_system: protocol_system.to_string(),
291                chain,
292                include_balances,
293                version: version.clone(),
294                pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
295            })
296            .collect::<Vec<_>>();
297
298        let mut tasks = Vec::new();
299        for body in chunked_bodies.iter() {
300            let sem = semaphore.clone();
301            tasks.push(async move {
302                let _permit = sem
303                    .acquire()
304                    .await
305                    .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
306                self.get_protocol_states(body).await
307            });
308        }
309
310        try_join_all(tasks)
311            .await
312            .map(|responses| {
313                let states = responses
314                    .clone()
315                    .into_iter()
316                    .flat_map(|r| r.states)
317                    .collect();
318                let total = responses
319                    .iter()
320                    .map(|r| r.pagination.total)
321                    .sum();
322                ProtocolStateRequestResponse {
323                    states,
324                    pagination: PaginationResponse { page: 0, page_size: chunk_size as i64, total },
325                }
326            })
327    }
328
329    /// This function returns only one chunk of tokens. To get all tokens please call
330    /// get_all_tokens.
331    async fn get_tokens(
332        &self,
333        request: &TokensRequestBody,
334    ) -> Result<TokensRequestResponse, RPCError>;
335
336    async fn get_all_tokens(
337        &self,
338        chain: Chain,
339        min_quality: Option<i32>,
340        traded_n_days_ago: Option<u64>,
341        chunk_size: usize,
342    ) -> Result<Vec<ResponseToken>, RPCError> {
343        let mut request_page = 0;
344        let mut all_tokens = Vec::new();
345        loop {
346            let mut response = self
347                .get_tokens(&TokensRequestBody {
348                    token_addresses: None,
349                    min_quality,
350                    traded_n_days_ago,
351                    pagination: PaginationParams {
352                        page: request_page,
353                        page_size: chunk_size.try_into().map_err(|_| {
354                            RPCError::FormatRequest(
355                                "Failed to convert chunk_size into i64".to_string(),
356                            )
357                        })?,
358                    },
359                    chain,
360                })
361                .await?;
362
363            let num_tokens = response.tokens.len();
364            all_tokens.append(&mut response.tokens);
365            request_page += 1;
366
367            if num_tokens < chunk_size {
368                break;
369            }
370        }
371        Ok(all_tokens)
372    }
373
374    async fn get_protocol_systems(
375        &self,
376        request: &ProtocolSystemsRequestBody,
377    ) -> Result<ProtocolSystemsRequestResponse, RPCError>;
378
379    async fn get_component_tvl(
380        &self,
381        request: &ComponentTvlRequestBody,
382    ) -> Result<ComponentTvlRequestResponse, RPCError>;
383
384    async fn get_component_tvl_paginated(
385        &self,
386        request: &ComponentTvlRequestBody,
387        chunk_size: usize,
388        concurrency: usize,
389    ) -> Result<ComponentTvlRequestResponse, RPCError> {
390        let semaphore = Arc::new(Semaphore::new(concurrency));
391
392        match request.component_ids {
393            Some(ref ids) => {
394                let chunked_requests = ids
395                    .chunks(chunk_size)
396                    .enumerate()
397                    .map(|(index, _)| ComponentTvlRequestBody {
398                        chain: request.chain,
399                        protocol_system: request.protocol_system.clone(),
400                        component_ids: Some(ids.clone()),
401                        pagination: PaginationParams {
402                            page: index as i64,
403                            page_size: chunk_size as i64,
404                        },
405                    })
406                    .collect::<Vec<_>>();
407
408                let tasks: Vec<_> = chunked_requests
409                    .into_iter()
410                    .map(|req| {
411                        let sem = semaphore.clone();
412                        async move {
413                            let _permit = sem
414                                .acquire()
415                                .await
416                                .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
417                            self.get_component_tvl(&req).await
418                        }
419                    })
420                    .collect();
421
422                let responses = try_join_all(tasks).await?;
423
424                let mut merged_tvl = HashMap::new();
425                for resp in responses {
426                    for (key, value) in resp.tvl {
427                        *merged_tvl.entry(key).or_insert(0.0) = value;
428                    }
429                }
430
431                Ok(ComponentTvlRequestResponse {
432                    tvl: merged_tvl,
433                    pagination: PaginationResponse {
434                        page: 0,
435                        page_size: chunk_size as i64,
436                        total: ids.len() as i64,
437                    },
438                })
439            }
440            _ => {
441                let first_request = ComponentTvlRequestBody {
442                    chain: request.chain,
443                    protocol_system: request.protocol_system.clone(),
444                    component_ids: request.component_ids.clone(),
445                    pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
446                };
447
448                let first_response = self
449                    .get_component_tvl(&first_request)
450                    .await?;
451                let total_items = first_response.pagination.total;
452                let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
453
454                let mut merged_tvl = first_response.tvl;
455
456                let mut page = 1;
457                while page < total_pages {
458                    let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
459
460                    let chunked_requests: Vec<_> = (0..requests_in_this_iteration)
461                        .map(|i| ComponentTvlRequestBody {
462                            chain: request.chain,
463                            protocol_system: request.protocol_system.clone(),
464                            component_ids: request.component_ids.clone(),
465                            pagination: PaginationParams {
466                                page: page + i,
467                                page_size: chunk_size as i64,
468                            },
469                        })
470                        .collect();
471
472                    let tasks: Vec<_> = chunked_requests
473                        .into_iter()
474                        .map(|req| {
475                            let sem = semaphore.clone();
476                            async move {
477                                let _permit = sem.acquire().await.map_err(|_| {
478                                    RPCError::Fatal("Semaphore dropped".to_string())
479                                })?;
480                                self.get_component_tvl(&req).await
481                            }
482                        })
483                        .collect();
484
485                    let responses = try_join_all(tasks).await?;
486
487                    // merge hashmap
488                    for resp in responses {
489                        for (key, value) in resp.tvl {
490                            *merged_tvl.entry(key).or_insert(0.0) += value;
491                        }
492                    }
493
494                    page += concurrency as i64;
495                }
496
497                Ok(ComponentTvlRequestResponse {
498                    tvl: merged_tvl,
499                    pagination: PaginationResponse {
500                        page: 0,
501                        page_size: chunk_size as i64,
502                        total: total_items,
503                    },
504                })
505            }
506        }
507    }
508}
509
510#[derive(Debug, Clone)]
511pub struct HttpRPCClient {
512    http_client: Client,
513    url: Url,
514}
515
516impl HttpRPCClient {
517    pub fn new(base_uri: &str, auth_key: Option<&str>) -> Result<Self, RPCError> {
518        let uri = base_uri
519            .parse::<Url>()
520            .map_err(|e| RPCError::UrlParsing(base_uri.to_string(), e.to_string()))?;
521
522        // Add default headers
523        let mut headers = header::HeaderMap::new();
524        headers.insert(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"));
525        let user_agent = format!("tycho-client-{}", env!("CARGO_PKG_VERSION"));
526        headers.insert(
527            header::USER_AGENT,
528            header::HeaderValue::from_str(&user_agent).expect("invalid user agent format"),
529        );
530
531        // Add Authorization if one is given
532        if let Some(key) = auth_key {
533            let mut auth_value = header::HeaderValue::from_str(key).expect("invalid key format");
534            auth_value.set_sensitive(true);
535            headers.insert(header::AUTHORIZATION, auth_value);
536        }
537
538        let client = ClientBuilder::new()
539            .default_headers(headers)
540            .http2_prior_knowledge()
541            .build()
542            .map_err(|e| RPCError::HttpClient(e.to_string()))?;
543        Ok(Self { http_client: client, url: uri })
544    }
545}
546
547#[async_trait]
548impl RPCClient for HttpRPCClient {
549    #[instrument(skip(self, request))]
550    async fn get_contract_state(
551        &self,
552        request: &StateRequestBody,
553    ) -> Result<StateRequestResponse, RPCError> {
554        // Check if contract ids are specified
555        if request.contract_ids.is_none() ||
556            request
557                .contract_ids
558                .as_ref()
559                .unwrap()
560                .is_empty()
561        {
562            warn!("No contract ids specified in request.");
563        }
564
565        let uri = format!(
566            "{}/{}/contract_state",
567            self.url
568                .to_string()
569                .trim_end_matches('/'),
570            TYCHO_SERVER_VERSION
571        );
572        debug!(%uri, "Sending contract_state request to Tycho server");
573        trace!(?request, "Sending request to Tycho server");
574
575        let response = self
576            .http_client
577            .post(&uri)
578            .json(request)
579            .send()
580            .await
581            .map_err(|e| RPCError::HttpClient(e.to_string()))?;
582        trace!(?response, "Received response from Tycho server");
583
584        let body = response
585            .text()
586            .await
587            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
588        if body.is_empty() {
589            // Pure native protocols will return empty contract states
590            return Ok(StateRequestResponse {
591                accounts: vec![],
592                pagination: PaginationResponse {
593                    page: request.pagination.page,
594                    page_size: request.pagination.page,
595                    total: 0,
596                },
597            });
598        }
599
600        let accounts = serde_json::from_str::<StateRequestResponse>(&body)
601            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
602        trace!(?accounts, "Received contract_state response from Tycho server");
603
604        Ok(accounts)
605    }
606
607    async fn get_protocol_components(
608        &self,
609        request: &ProtocolComponentsRequestBody,
610    ) -> Result<ProtocolComponentRequestResponse, RPCError> {
611        let uri = format!(
612            "{}/{}/protocol_components",
613            self.url
614                .to_string()
615                .trim_end_matches('/'),
616            TYCHO_SERVER_VERSION,
617        );
618        debug!(%uri, "Sending protocol_components request to Tycho server");
619        trace!(?request, "Sending request to Tycho server");
620
621        let response = self
622            .http_client
623            .post(uri)
624            .header(header::CONTENT_TYPE, "application/json")
625            .json(request)
626            .send()
627            .await
628            .map_err(|e| RPCError::HttpClient(e.to_string()))?;
629
630        trace!(?response, "Received response from Tycho server");
631
632        let body = response
633            .text()
634            .await
635            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
636        let components = serde_json::from_str::<ProtocolComponentRequestResponse>(&body)
637            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
638        trace!(?components, "Received protocol_components response from Tycho server");
639
640        Ok(components)
641    }
642
643    async fn get_protocol_states(
644        &self,
645        request: &ProtocolStateRequestBody,
646    ) -> Result<ProtocolStateRequestResponse, RPCError> {
647        // Check if contract ids are specified
648        if request.protocol_ids.is_none() ||
649            request
650                .protocol_ids
651                .as_ref()
652                .unwrap()
653                .is_empty()
654        {
655            warn!("No protocol ids specified in request.");
656        }
657
658        let uri = format!(
659            "{}/{}/protocol_state",
660            self.url
661                .to_string()
662                .trim_end_matches('/'),
663            TYCHO_SERVER_VERSION
664        );
665        debug!(%uri, "Sending protocol_states request to Tycho server");
666        trace!(?request, "Sending request to Tycho server");
667
668        let response = self
669            .http_client
670            .post(&uri)
671            .json(request)
672            .send()
673            .await
674            .map_err(|e| RPCError::HttpClient(e.to_string()))?;
675        trace!(?response, "Received response from Tycho server");
676
677        let body = response
678            .text()
679            .await
680            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
681
682        if body.is_empty() {
683            // Pure VM protocols will return empty states
684            return Ok(ProtocolStateRequestResponse {
685                states: vec![],
686                pagination: PaginationResponse {
687                    page: request.pagination.page,
688                    page_size: request.pagination.page_size,
689                    total: 0,
690                },
691            });
692        }
693
694        let states = serde_json::from_str::<ProtocolStateRequestResponse>(&body)
695            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
696        trace!(?states, "Received protocol_states response from Tycho server");
697
698        Ok(states)
699    }
700
701    async fn get_tokens(
702        &self,
703        request: &TokensRequestBody,
704    ) -> Result<TokensRequestResponse, RPCError> {
705        let uri = format!(
706            "{}/{}/tokens",
707            self.url
708                .to_string()
709                .trim_end_matches('/'),
710            TYCHO_SERVER_VERSION
711        );
712        debug!(%uri, "Sending tokens request to Tycho server");
713
714        let response = self
715            .http_client
716            .post(&uri)
717            .json(request)
718            .send()
719            .await
720            .map_err(|e| RPCError::HttpClient(e.to_string()))?;
721
722        let body = response
723            .text()
724            .await
725            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
726        let tokens = serde_json::from_str::<TokensRequestResponse>(&body)
727            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
728
729        Ok(tokens)
730    }
731
732    async fn get_protocol_systems(
733        &self,
734        request: &ProtocolSystemsRequestBody,
735    ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
736        let uri = format!(
737            "{}/{}/protocol_systems",
738            self.url
739                .to_string()
740                .trim_end_matches('/'),
741            TYCHO_SERVER_VERSION
742        );
743        debug!(%uri, "Sending protocol_systems request to Tycho server");
744        trace!(?request, "Sending request to Tycho server");
745        let response = self
746            .http_client
747            .post(&uri)
748            .json(request)
749            .send()
750            .await
751            .map_err(|e| RPCError::HttpClient(e.to_string()))?;
752        trace!(?response, "Received response from Tycho server");
753        let body = response
754            .text()
755            .await
756            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
757        let protocol_systems = serde_json::from_str::<ProtocolSystemsRequestResponse>(&body)
758            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
759        trace!(?protocol_systems, "Received protocol_systems response from Tycho server");
760        Ok(protocol_systems)
761    }
762
763    async fn get_component_tvl(
764        &self,
765        request: &ComponentTvlRequestBody,
766    ) -> Result<ComponentTvlRequestResponse, RPCError> {
767        let uri = format!(
768            "{}/{}/component_tvl",
769            self.url
770                .to_string()
771                .trim_end_matches('/'),
772            TYCHO_SERVER_VERSION
773        );
774        debug!(%uri, "Sending get_component_tvl request to Tycho server");
775        trace!(?request, "Sending request to Tycho server");
776        let response = self
777            .http_client
778            .post(&uri)
779            .json(request)
780            .send()
781            .await
782            .map_err(|e| RPCError::HttpClient(e.to_string()))?;
783        trace!(?response, "Received response from Tycho server");
784        let body = response
785            .text()
786            .await
787            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
788        let component_tvl =
789            serde_json::from_str::<ComponentTvlRequestResponse>(&body).map_err(|err| {
790                error!("Failed to parse component_tvl response: {:?}", &body);
791                RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
792            })?;
793        trace!(?component_tvl, "Received component_tvl response from Tycho server");
794        Ok(component_tvl)
795    }
796}
797
798#[cfg(test)]
799mod tests {
800    use std::{collections::HashMap, str::FromStr};
801
802    use mockito::Server;
803    use rstest::rstest;
804    // TODO: remove once deprecated ProtocolId struct is removed
805    #[allow(deprecated)]
806    use tycho_common::dto::ProtocolId;
807
808    use super::*;
809
810    // Dummy implementation of `get_protocol_states_paginated` for backwards compatibility testing
811    // purposes
812    impl MockRPCClient {
813        #[allow(clippy::too_many_arguments)]
814        async fn test_get_protocol_states_paginated<T>(
815            &self,
816            chain: Chain,
817            ids: &[T],
818            protocol_system: &str,
819            include_balances: bool,
820            version: &VersionParam,
821            chunk_size: usize,
822            _concurrency: usize,
823        ) -> Vec<ProtocolStateRequestBody>
824        where
825            T: AsRef<str> + Clone + Send + Sync + 'static,
826        {
827            ids.chunks(chunk_size)
828                .map(|chunk| ProtocolStateRequestBody {
829                    protocol_ids: Some(
830                        chunk
831                            .iter()
832                            .map(|id| id.as_ref().to_string())
833                            .collect(),
834                    ),
835                    protocol_system: protocol_system.to_string(),
836                    chain,
837                    include_balances,
838                    version: version.clone(),
839                    pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
840                })
841                .collect()
842        }
843    }
844
845    // TODO: remove once deprecated ProtocolId struct is removed
846    #[allow(deprecated)]
847    #[rstest]
848    #[case::protocol_id_input(vec![
849        ProtocolId { id: "id1".to_string(), chain: Chain::Ethereum },
850        ProtocolId { id: "id2".to_string(), chain: Chain::Ethereum }
851    ])]
852    #[case::string_input(vec![
853        "id1".to_string(),
854        "id2".to_string()
855    ])]
856    #[tokio::test]
857    async fn test_get_protocol_states_paginated_backwards_compatibility<T>(#[case] ids: Vec<T>)
858    where
859        T: AsRef<str> + Clone + Send + Sync + 'static,
860    {
861        let mock_client = MockRPCClient::new();
862
863        let request_bodies = mock_client
864            .test_get_protocol_states_paginated(
865                Chain::Ethereum,
866                &ids,
867                "test_system",
868                true,
869                &VersionParam::default(),
870                2,
871                2,
872            )
873            .await;
874
875        // Verify that the request bodies have been created correctly
876        assert_eq!(request_bodies.len(), 1);
877        assert_eq!(
878            request_bodies[0]
879                .protocol_ids
880                .as_ref()
881                .unwrap()
882                .len(),
883            2
884        );
885    }
886
887    #[tokio::test]
888    async fn test_get_contract_state() {
889        let mut server = Server::new_async().await;
890        let server_resp = r#"
891        {
892            "accounts": [
893                {
894                    "chain": "ethereum",
895                    "address": "0x0000000000000000000000000000000000000000",
896                    "title": "",
897                    "slots": {},
898                    "native_balance": "0x01f4",
899                    "token_balances": {},
900                    "code": "0x00",
901                    "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
902                    "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
903                    "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
904                    "creation_tx": null
905                }
906            ],
907            "pagination": {
908                "page": 0,
909                "page_size": 20,
910                "total": 10
911            }
912        }
913        "#;
914        // test that the response is deserialized correctly
915        serde_json::from_str::<StateRequestResponse>(server_resp).expect("deserialize");
916
917        let mocked_server = server
918            .mock("POST", "/v1/contract_state")
919            .expect(1)
920            .with_body(server_resp)
921            .create_async()
922            .await;
923
924        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
925
926        let response = client
927            .get_contract_state(&Default::default())
928            .await
929            .expect("get state");
930        let accounts = response.accounts;
931
932        mocked_server.assert();
933        assert_eq!(accounts.len(), 1);
934        assert_eq!(accounts[0].slots, HashMap::new());
935        assert_eq!(accounts[0].native_balance, Bytes::from(500u16.to_be_bytes()));
936        assert_eq!(accounts[0].code, [0].to_vec());
937        assert_eq!(
938            accounts[0].code_hash,
939            hex::decode("5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e")
940                .unwrap()
941        );
942    }
943
944    #[tokio::test]
945    async fn test_get_protocol_components() {
946        let mut server = Server::new_async().await;
947        let server_resp = r#"
948        {
949            "protocol_components": [
950                {
951                    "id": "State1",
952                    "protocol_system": "ambient",
953                    "protocol_type_name": "Pool",
954                    "chain": "ethereum",
955                    "tokens": [
956                        "0x0000000000000000000000000000000000000000",
957                        "0x0000000000000000000000000000000000000001"
958                    ],
959                    "contract_ids": [
960                        "0x0000000000000000000000000000000000000000"
961                    ],
962                    "static_attributes": {
963                        "attribute_1": "0x00000000000003e8"
964                    },
965                    "change": "Creation",
966                    "creation_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
967                    "created_at": "2022-01-01T00:00:00"
968                }
969            ],
970            "pagination": {
971                "page": 0,
972                "page_size": 20,
973                "total": 10
974            }
975        }
976        "#;
977        // test that the response is deserialized correctly
978        serde_json::from_str::<ProtocolComponentRequestResponse>(server_resp).expect("deserialize");
979
980        let mocked_server = server
981            .mock("POST", "/v1/protocol_components")
982            .expect(1)
983            .with_body(server_resp)
984            .create_async()
985            .await;
986
987        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
988
989        let response = client
990            .get_protocol_components(&Default::default())
991            .await
992            .expect("get state");
993        let components = response.protocol_components;
994
995        mocked_server.assert();
996        assert_eq!(components.len(), 1);
997        assert_eq!(components[0].id, "State1");
998        assert_eq!(components[0].protocol_system, "ambient");
999        assert_eq!(components[0].protocol_type_name, "Pool");
1000        assert_eq!(components[0].tokens.len(), 2);
1001        let expected_attributes =
1002            [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
1003                .iter()
1004                .cloned()
1005                .collect::<HashMap<String, Bytes>>();
1006        assert_eq!(components[0].static_attributes, expected_attributes);
1007    }
1008
1009    #[tokio::test]
1010    async fn test_get_protocol_states() {
1011        let mut server = Server::new_async().await;
1012        let server_resp = r#"
1013        {
1014            "states": [
1015                {
1016                    "component_id": "State1",
1017                    "attributes": {
1018                        "attribute_1": "0x00000000000003e8"
1019                    },
1020                    "balances": {
1021                        "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
1022                    }
1023                }
1024            ],
1025            "pagination": {
1026                "page": 0,
1027                "page_size": 20,
1028                "total": 10
1029            }
1030        }
1031        "#;
1032        // test that the response is deserialized correctly
1033        serde_json::from_str::<ProtocolStateRequestResponse>(server_resp).expect("deserialize");
1034
1035        let mocked_server = server
1036            .mock("POST", "/v1/protocol_state")
1037            .expect(1)
1038            .with_body(server_resp)
1039            .create_async()
1040            .await;
1041        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1042
1043        let response = client
1044            .get_protocol_states(&Default::default())
1045            .await
1046            .expect("get state");
1047        let states = response.states;
1048
1049        mocked_server.assert();
1050        assert_eq!(states.len(), 1);
1051        assert_eq!(states[0].component_id, "State1");
1052        let expected_attributes =
1053            [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
1054                .iter()
1055                .cloned()
1056                .collect::<HashMap<String, Bytes>>();
1057        assert_eq!(states[0].attributes, expected_attributes);
1058        let expected_balances = [(
1059            Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")
1060                .expect("Unsupported address format"),
1061            Bytes::from_str("0x01f4").unwrap(),
1062        )]
1063        .iter()
1064        .cloned()
1065        .collect::<HashMap<Bytes, Bytes>>();
1066        assert_eq!(states[0].balances, expected_balances);
1067    }
1068
1069    #[tokio::test]
1070    async fn test_get_tokens() {
1071        let mut server = Server::new_async().await;
1072        let server_resp = r#"
1073        {
1074            "tokens": [
1075              {
1076                "chain": "ethereum",
1077                "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
1078                "symbol": "WETH",
1079                "decimals": 18,
1080                "tax": 0,
1081                "gas": [
1082                  29962
1083                ],
1084                "quality": 100
1085              },
1086              {
1087                "chain": "ethereum",
1088                "address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
1089                "symbol": "USDC",
1090                "decimals": 6,
1091                "tax": 0,
1092                "gas": [
1093                  40652
1094                ],
1095                "quality": 100
1096              }
1097            ],
1098            "pagination": {
1099              "page": 0,
1100              "page_size": 20,
1101              "total": 10
1102            }
1103          }
1104        "#;
1105        // test that the response is deserialized correctly
1106        serde_json::from_str::<TokensRequestResponse>(server_resp).expect("deserialize");
1107
1108        let mocked_server = server
1109            .mock("POST", "/v1/tokens")
1110            .expect(1)
1111            .with_body(server_resp)
1112            .create_async()
1113            .await;
1114        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1115
1116        let response = client
1117            .get_tokens(&Default::default())
1118            .await
1119            .expect("get tokens");
1120
1121        let expected = vec![
1122            ResponseToken {
1123                chain: Chain::Ethereum,
1124                address: Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap(),
1125                symbol: "WETH".to_string(),
1126                decimals: 18,
1127                tax: 0,
1128                gas: vec![Some(29962)],
1129                quality: 100,
1130            },
1131            ResponseToken {
1132                chain: Chain::Ethereum,
1133                address: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
1134                symbol: "USDC".to_string(),
1135                decimals: 6,
1136                tax: 0,
1137                gas: vec![Some(40652)],
1138                quality: 100,
1139            },
1140        ];
1141
1142        mocked_server.assert();
1143        assert_eq!(response.tokens, expected);
1144        assert_eq!(response.pagination, PaginationResponse { page: 0, page_size: 20, total: 10 });
1145    }
1146
1147    #[tokio::test]
1148    async fn test_get_protocol_systems() {
1149        let mut server = Server::new_async().await;
1150        let server_resp = r#"
1151        {
1152            "protocol_systems": [
1153                "system1",
1154                "system2"
1155            ],
1156            "pagination": {
1157                "page": 0,
1158                "page_size": 20,
1159                "total": 10
1160            }
1161        }
1162        "#;
1163        // test that the response is deserialized correctly
1164        serde_json::from_str::<ProtocolSystemsRequestResponse>(server_resp).expect("deserialize");
1165
1166        let mocked_server = server
1167            .mock("POST", "/v1/protocol_systems")
1168            .expect(1)
1169            .with_body(server_resp)
1170            .create_async()
1171            .await;
1172        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1173
1174        let response = client
1175            .get_protocol_systems(&Default::default())
1176            .await
1177            .expect("get protocol systems");
1178        let protocol_systems = response.protocol_systems;
1179
1180        mocked_server.assert();
1181        assert_eq!(protocol_systems, vec!["system1", "system2"]);
1182    }
1183
1184    #[tokio::test]
1185    async fn test_get_component_tvl() {
1186        let mut server = Server::new_async().await;
1187        let server_resp = r#"
1188        {
1189            "tvl": {
1190                "component1": 100.0
1191            },
1192            "pagination": {
1193                "page": 0,
1194                "page_size": 20,
1195                "total": 10
1196            }
1197        }
1198        "#;
1199        // test that the response is deserialized correctly
1200        serde_json::from_str::<ComponentTvlRequestResponse>(server_resp).expect("deserialize");
1201
1202        let mocked_server = server
1203            .mock("POST", "/v1/component_tvl")
1204            .expect(1)
1205            .with_body(server_resp)
1206            .create_async()
1207            .await;
1208        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1209
1210        let response = client
1211            .get_component_tvl(&Default::default())
1212            .await
1213            .expect("get protocol systems");
1214        let component_tvl = response.tvl;
1215
1216        mocked_server.assert();
1217        assert_eq!(component_tvl.get("component1"), Some(&100.0));
1218    }
1219}