tycho_client/
rpc.rs

1//! # Tycho RPC Client
2//!
3//! The objective of this module is to provide swift and simplified access to the Remote Procedure
4//! Call (RPC) endpoints of Tycho. These endpoints are chiefly responsible for facilitating data
5//! queries, especially querying snapshots of data.
6use std::{collections::HashMap, sync::Arc};
7
8use async_trait::async_trait;
9use futures03::future::try_join_all;
10#[cfg(test)]
11use mockall::automock;
12use reqwest::{header, Client, ClientBuilder, Url};
13use thiserror::Error;
14use tokio::sync::Semaphore;
15use tracing::{debug, error, instrument, trace, warn};
16use tycho_common::{
17    dto::{
18        Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse, PaginationParams,
19        PaginationResponse, ProtocolComponentRequestResponse, ProtocolComponentsRequestBody,
20        ProtocolStateRequestBody, ProtocolStateRequestResponse, ProtocolSystemsRequestBody,
21        ProtocolSystemsRequestResponse, ResponseToken, StateRequestBody, StateRequestResponse,
22        TokensRequestBody, TokensRequestResponse, VersionParam,
23    },
24    Bytes,
25};
26
27use crate::TYCHO_SERVER_VERSION;
28
29#[derive(Error, Debug)]
30pub enum RPCError {
31    /// The passed tycho url failed to parse.
32    #[error("Failed to parse URL: {0}. Error: {1}")]
33    UrlParsing(String, String),
34
35    /// The request data is not correctly formed.
36    #[error("Failed to format request: {0}")]
37    FormatRequest(String),
38
39    /// Errors forwarded from the HTTP protocol.
40    #[error("Unexpected HTTP client error: {0}")]
41    HttpClient(String),
42
43    /// The response from the server could not be parsed correctly.
44    #[error("Failed to parse response: {0}")]
45    ParseResponse(String),
46
47    /// Other fatal errors.
48    #[error("Fatal error: {0}")]
49    Fatal(String),
50}
51
52#[cfg_attr(test, automock)]
53#[async_trait]
54pub trait RPCClient: Send + Sync {
55    /// Retrieves a snapshot of contract state.
56    async fn get_contract_state(
57        &self,
58        request: &StateRequestBody,
59    ) -> Result<StateRequestResponse, RPCError>;
60
61    async fn get_contract_state_paginated(
62        &self,
63        chain: Chain,
64        ids: &[Bytes],
65        protocol_system: &str,
66        version: &VersionParam,
67        chunk_size: usize,
68        concurrency: usize,
69    ) -> Result<StateRequestResponse, RPCError> {
70        let semaphore = Arc::new(Semaphore::new(concurrency));
71
72        let chunked_bodies = ids
73            .chunks(chunk_size)
74            .map(|chunk| StateRequestBody {
75                contract_ids: Some(chunk.to_vec()),
76                protocol_system: protocol_system.to_string(),
77                chain,
78                version: version.clone(),
79                pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
80            })
81            .collect::<Vec<_>>();
82
83        let mut tasks = Vec::new();
84        for body in chunked_bodies.iter() {
85            let sem = semaphore.clone();
86            tasks.push(async move {
87                let _permit = sem
88                    .acquire()
89                    .await
90                    .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
91                self.get_contract_state(body).await
92            });
93        }
94
95        // Execute all tasks concurrently with the defined concurrency limit.
96        let responses = try_join_all(tasks).await?;
97
98        // Aggregate the responses into a single result.
99        let accounts = responses
100            .iter()
101            .flat_map(|r| r.accounts.clone())
102            .collect();
103        let total: i64 = responses
104            .iter()
105            .map(|r| r.pagination.total)
106            .sum();
107
108        Ok(StateRequestResponse {
109            accounts,
110            pagination: PaginationResponse { page: 0, page_size: chunk_size as i64, total },
111        })
112    }
113
114    async fn get_protocol_components(
115        &self,
116        request: &ProtocolComponentsRequestBody,
117    ) -> Result<ProtocolComponentRequestResponse, RPCError>;
118
119    async fn get_protocol_components_paginated(
120        &self,
121        request: &ProtocolComponentsRequestBody,
122        chunk_size: usize,
123        concurrency: usize,
124    ) -> Result<ProtocolComponentRequestResponse, RPCError> {
125        let semaphore = Arc::new(Semaphore::new(concurrency));
126
127        // If a set of component IDs is specified, the maximum return size is already known,
128        // allowing us to pre-compute the number of requests to be made.
129        match request.component_ids {
130            Some(ref ids) => {
131                // We can divide the component_ids into chunks of size chunk_size
132                let chunked_bodies = ids
133                    .chunks(chunk_size)
134                    .enumerate()
135                    .map(|(index, _)| ProtocolComponentsRequestBody {
136                        protocol_system: request.protocol_system.clone(),
137                        component_ids: request.component_ids.clone(),
138                        tvl_gt: request.tvl_gt,
139                        chain: request.chain,
140                        pagination: PaginationParams {
141                            page: index as i64,
142                            page_size: chunk_size as i64,
143                        },
144                    })
145                    .collect::<Vec<_>>();
146
147                let mut tasks = Vec::new();
148                for body in chunked_bodies.iter() {
149                    let sem = semaphore.clone();
150                    tasks.push(async move {
151                        let _permit = sem
152                            .acquire()
153                            .await
154                            .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
155                        self.get_protocol_components(body).await
156                    });
157                }
158
159                try_join_all(tasks)
160                    .await
161                    .map(|responses| ProtocolComponentRequestResponse {
162                        protocol_components: responses
163                            .into_iter()
164                            .flat_map(|r| r.protocol_components.into_iter())
165                            .collect(),
166                        pagination: PaginationResponse {
167                            page: 0,
168                            page_size: chunk_size as i64,
169                            total: ids.len() as i64,
170                        },
171                    })
172            }
173            _ => {
174                // If no component ids are specified, we need to make requests based on the total
175                // number of results from the first response.
176
177                let initial_request = ProtocolComponentsRequestBody {
178                    protocol_system: request.protocol_system.clone(),
179                    component_ids: request.component_ids.clone(),
180                    tvl_gt: request.tvl_gt,
181                    chain: request.chain,
182                    pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
183                };
184                let first_response = self
185                    .get_protocol_components(&initial_request)
186                    .await
187                    .map_err(|err| RPCError::Fatal(err.to_string()))?;
188
189                let total_items = first_response.pagination.total;
190                let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
191
192                // Initialize the final response accumulator
193                let mut accumulated_response = ProtocolComponentRequestResponse {
194                    protocol_components: first_response.protocol_components,
195                    pagination: PaginationResponse {
196                        page: 0,
197                        page_size: chunk_size as i64,
198                        total: total_items,
199                    },
200                };
201
202                let mut page = 1;
203                while page < total_pages {
204                    let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
205
206                    // Create request bodies for parallel requests, respecting the concurrency limit
207                    let chunked_bodies = (0..requests_in_this_iteration)
208                        .map(|iter| ProtocolComponentsRequestBody {
209                            protocol_system: request.protocol_system.clone(),
210                            component_ids: request.component_ids.clone(),
211                            tvl_gt: request.tvl_gt,
212                            chain: request.chain,
213                            pagination: PaginationParams {
214                                page: page + iter,
215                                page_size: chunk_size as i64,
216                            },
217                        })
218                        .collect::<Vec<_>>();
219
220                    let tasks: Vec<_> = chunked_bodies
221                        .iter()
222                        .map(|body| {
223                            let sem = semaphore.clone();
224                            async move {
225                                let _permit = sem.acquire().await.map_err(|_| {
226                                    RPCError::Fatal("Semaphore dropped".to_string())
227                                })?;
228                                self.get_protocol_components(body).await
229                            }
230                        })
231                        .collect();
232
233                    let responses = try_join_all(tasks)
234                        .await
235                        .map(|responses| {
236                            let total = responses[0].pagination.total;
237                            ProtocolComponentRequestResponse {
238                                protocol_components: responses
239                                    .into_iter()
240                                    .flat_map(|r| r.protocol_components.into_iter())
241                                    .collect(),
242                                pagination: PaginationResponse {
243                                    page,
244                                    page_size: chunk_size as i64,
245                                    total,
246                                },
247                            }
248                        });
249
250                    // Update the accumulated response or set the initial response
251                    match responses {
252                        Ok(mut resp) => {
253                            accumulated_response
254                                .protocol_components
255                                .append(&mut resp.protocol_components);
256                        }
257                        Err(e) => return Err(e),
258                    }
259
260                    page += concurrency as i64;
261                }
262                Ok(accumulated_response)
263            }
264        }
265    }
266
267    async fn get_protocol_states(
268        &self,
269        request: &ProtocolStateRequestBody,
270    ) -> Result<ProtocolStateRequestResponse, RPCError>;
271
272    #[allow(clippy::too_many_arguments)]
273    async fn get_protocol_states_paginated<T>(
274        &self,
275        chain: Chain,
276        ids: &[T],
277        protocol_system: &str,
278        include_balances: bool,
279        version: &VersionParam,
280        chunk_size: usize,
281        concurrency: usize,
282    ) -> Result<ProtocolStateRequestResponse, RPCError>
283    where
284        T: AsRef<str> + Sync + 'static,
285    {
286        let semaphore = Arc::new(Semaphore::new(concurrency));
287        let chunked_bodies = ids
288            .chunks(chunk_size)
289            .map(|c| ProtocolStateRequestBody {
290                protocol_ids: Some(
291                    c.iter()
292                        .map(|id| id.as_ref().to_string())
293                        .collect(),
294                ),
295                protocol_system: protocol_system.to_string(),
296                chain,
297                include_balances,
298                version: version.clone(),
299                pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
300            })
301            .collect::<Vec<_>>();
302
303        let mut tasks = Vec::new();
304        for body in chunked_bodies.iter() {
305            let sem = semaphore.clone();
306            tasks.push(async move {
307                let _permit = sem
308                    .acquire()
309                    .await
310                    .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
311                self.get_protocol_states(body).await
312            });
313        }
314
315        try_join_all(tasks)
316            .await
317            .map(|responses| {
318                let states = responses
319                    .clone()
320                    .into_iter()
321                    .flat_map(|r| r.states)
322                    .collect();
323                let total = responses
324                    .iter()
325                    .map(|r| r.pagination.total)
326                    .sum();
327                ProtocolStateRequestResponse {
328                    states,
329                    pagination: PaginationResponse { page: 0, page_size: chunk_size as i64, total },
330                }
331            })
332    }
333
334    /// This function returns only one chunk of tokens. To get all tokens please call
335    /// get_all_tokens.
336    async fn get_tokens(
337        &self,
338        request: &TokensRequestBody,
339    ) -> Result<TokensRequestResponse, RPCError>;
340
341    async fn get_all_tokens(
342        &self,
343        chain: Chain,
344        min_quality: Option<i32>,
345        traded_n_days_ago: Option<u64>,
346        chunk_size: usize,
347    ) -> Result<Vec<ResponseToken>, RPCError> {
348        let mut request_page = 0;
349        let mut all_tokens = Vec::new();
350        loop {
351            let mut response = self
352                .get_tokens(&TokensRequestBody {
353                    token_addresses: None,
354                    min_quality,
355                    traded_n_days_ago,
356                    pagination: PaginationParams {
357                        page: request_page,
358                        page_size: chunk_size.try_into().map_err(|_| {
359                            RPCError::FormatRequest(
360                                "Failed to convert chunk_size into i64".to_string(),
361                            )
362                        })?,
363                    },
364                    chain,
365                })
366                .await?;
367
368            let num_tokens = response.tokens.len();
369            all_tokens.append(&mut response.tokens);
370            request_page += 1;
371
372            if num_tokens < chunk_size {
373                break;
374            }
375        }
376        Ok(all_tokens)
377    }
378
379    async fn get_protocol_systems(
380        &self,
381        request: &ProtocolSystemsRequestBody,
382    ) -> Result<ProtocolSystemsRequestResponse, RPCError>;
383
384    async fn get_component_tvl(
385        &self,
386        request: &ComponentTvlRequestBody,
387    ) -> Result<ComponentTvlRequestResponse, RPCError>;
388
389    async fn get_component_tvl_paginated(
390        &self,
391        request: &ComponentTvlRequestBody,
392        chunk_size: usize,
393        concurrency: usize,
394    ) -> Result<ComponentTvlRequestResponse, RPCError> {
395        let semaphore = Arc::new(Semaphore::new(concurrency));
396
397        match request.component_ids {
398            Some(ref ids) => {
399                let chunked_requests = ids
400                    .chunks(chunk_size)
401                    .enumerate()
402                    .map(|(index, _)| ComponentTvlRequestBody {
403                        chain: request.chain,
404                        protocol_system: request.protocol_system.clone(),
405                        component_ids: Some(ids.clone()),
406                        pagination: PaginationParams {
407                            page: index as i64,
408                            page_size: chunk_size as i64,
409                        },
410                    })
411                    .collect::<Vec<_>>();
412
413                let tasks: Vec<_> = chunked_requests
414                    .into_iter()
415                    .map(|req| {
416                        let sem = semaphore.clone();
417                        async move {
418                            let _permit = sem
419                                .acquire()
420                                .await
421                                .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
422                            self.get_component_tvl(&req).await
423                        }
424                    })
425                    .collect();
426
427                let responses = try_join_all(tasks).await?;
428
429                let mut merged_tvl = HashMap::new();
430                for resp in responses {
431                    for (key, value) in resp.tvl {
432                        *merged_tvl.entry(key).or_insert(0.0) = value;
433                    }
434                }
435
436                Ok(ComponentTvlRequestResponse {
437                    tvl: merged_tvl,
438                    pagination: PaginationResponse {
439                        page: 0,
440                        page_size: chunk_size as i64,
441                        total: ids.len() as i64,
442                    },
443                })
444            }
445            _ => {
446                let first_request = ComponentTvlRequestBody {
447                    chain: request.chain,
448                    protocol_system: request.protocol_system.clone(),
449                    component_ids: request.component_ids.clone(),
450                    pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
451                };
452
453                let first_response = self
454                    .get_component_tvl(&first_request)
455                    .await?;
456                let total_items = first_response.pagination.total;
457                let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
458
459                let mut merged_tvl = first_response.tvl;
460
461                let mut page = 1;
462                while page < total_pages {
463                    let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
464
465                    let chunked_requests: Vec<_> = (0..requests_in_this_iteration)
466                        .map(|i| ComponentTvlRequestBody {
467                            chain: request.chain,
468                            protocol_system: request.protocol_system.clone(),
469                            component_ids: request.component_ids.clone(),
470                            pagination: PaginationParams {
471                                page: page + i,
472                                page_size: chunk_size as i64,
473                            },
474                        })
475                        .collect();
476
477                    let tasks: Vec<_> = chunked_requests
478                        .into_iter()
479                        .map(|req| {
480                            let sem = semaphore.clone();
481                            async move {
482                                let _permit = sem.acquire().await.map_err(|_| {
483                                    RPCError::Fatal("Semaphore dropped".to_string())
484                                })?;
485                                self.get_component_tvl(&req).await
486                            }
487                        })
488                        .collect();
489
490                    let responses = try_join_all(tasks).await?;
491
492                    // merge hashmap
493                    for resp in responses {
494                        for (key, value) in resp.tvl {
495                            *merged_tvl.entry(key).or_insert(0.0) += value;
496                        }
497                    }
498
499                    page += concurrency as i64;
500                }
501
502                Ok(ComponentTvlRequestResponse {
503                    tvl: merged_tvl,
504                    pagination: PaginationResponse {
505                        page: 0,
506                        page_size: chunk_size as i64,
507                        total: total_items,
508                    },
509                })
510            }
511        }
512    }
513}
514
515#[derive(Debug, Clone)]
516pub struct HttpRPCClient {
517    http_client: Client,
518    url: Url,
519}
520
521impl HttpRPCClient {
522    pub fn new(base_uri: &str, auth_key: Option<&str>) -> Result<Self, RPCError> {
523        let uri = base_uri
524            .parse::<Url>()
525            .map_err(|e| RPCError::UrlParsing(base_uri.to_string(), e.to_string()))?;
526
527        // Add default headers
528        let mut headers = header::HeaderMap::new();
529        headers.insert(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"));
530        let user_agent = format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION"));
531        headers.insert(
532            header::USER_AGENT,
533            header::HeaderValue::from_str(&user_agent)
534                .map_err(|e| RPCError::FormatRequest(format!("Invalid user agent format: {e}")))?,
535        );
536
537        // Add Authorization if one is given
538        if let Some(key) = auth_key {
539            let mut auth_value = header::HeaderValue::from_str(key).map_err(|e| {
540                RPCError::FormatRequest(format!("Invalid authorization key format: {e}"))
541            })?;
542            auth_value.set_sensitive(true);
543            headers.insert(header::AUTHORIZATION, auth_value);
544        }
545
546        let client = ClientBuilder::new()
547            .default_headers(headers)
548            .http2_prior_knowledge()
549            .build()
550            .map_err(|e| RPCError::HttpClient(e.to_string()))?;
551        Ok(Self { http_client: client, url: uri })
552    }
553}
554
555#[async_trait]
556impl RPCClient for HttpRPCClient {
557    #[instrument(skip(self, request))]
558    async fn get_contract_state(
559        &self,
560        request: &StateRequestBody,
561    ) -> Result<StateRequestResponse, RPCError> {
562        // Check if contract ids are specified
563        if request
564            .contract_ids
565            .as_ref()
566            .is_none_or(|ids| ids.is_empty())
567        {
568            warn!("No contract ids specified in request.");
569        }
570
571        let uri = format!(
572            "{}/{}/contract_state",
573            self.url
574                .to_string()
575                .trim_end_matches('/'),
576            TYCHO_SERVER_VERSION
577        );
578        debug!(%uri, "Sending contract_state request to Tycho server");
579        trace!(?request, "Sending request to Tycho server");
580
581        let response = self
582            .http_client
583            .post(&uri)
584            .json(request)
585            .send()
586            .await
587            .map_err(|e| RPCError::HttpClient(e.to_string()))?;
588        trace!(?response, "Received response from Tycho server");
589
590        let body = response
591            .text()
592            .await
593            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
594        if body.is_empty() {
595            // Pure native protocols will return empty contract states
596            return Ok(StateRequestResponse {
597                accounts: vec![],
598                pagination: PaginationResponse {
599                    page: request.pagination.page,
600                    page_size: request.pagination.page,
601                    total: 0,
602                },
603            });
604        }
605
606        let accounts = serde_json::from_str::<StateRequestResponse>(&body)
607            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
608        trace!(?accounts, "Received contract_state response from Tycho server");
609
610        Ok(accounts)
611    }
612
613    async fn get_protocol_components(
614        &self,
615        request: &ProtocolComponentsRequestBody,
616    ) -> Result<ProtocolComponentRequestResponse, RPCError> {
617        let uri = format!(
618            "{}/{}/protocol_components",
619            self.url
620                .to_string()
621                .trim_end_matches('/'),
622            TYCHO_SERVER_VERSION,
623        );
624        debug!(%uri, "Sending protocol_components request to Tycho server");
625        trace!(?request, "Sending request to Tycho server");
626
627        let response = self
628            .http_client
629            .post(uri)
630            .header(header::CONTENT_TYPE, "application/json")
631            .json(request)
632            .send()
633            .await
634            .map_err(|e| RPCError::HttpClient(e.to_string()))?;
635
636        trace!(?response, "Received response from Tycho server");
637
638        let body = response
639            .text()
640            .await
641            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
642        let components = serde_json::from_str::<ProtocolComponentRequestResponse>(&body)
643            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
644        trace!(?components, "Received protocol_components response from Tycho server");
645
646        Ok(components)
647    }
648
649    async fn get_protocol_states(
650        &self,
651        request: &ProtocolStateRequestBody,
652    ) -> Result<ProtocolStateRequestResponse, RPCError> {
653        // Check if protocol ids are specified
654        if request
655            .protocol_ids
656            .as_ref()
657            .is_none_or(|ids| ids.is_empty())
658        {
659            warn!("No protocol ids specified in request.");
660        }
661
662        let uri = format!(
663            "{}/{}/protocol_state",
664            self.url
665                .to_string()
666                .trim_end_matches('/'),
667            TYCHO_SERVER_VERSION
668        );
669        debug!(%uri, "Sending protocol_states request to Tycho server");
670        trace!(?request, "Sending request to Tycho server");
671
672        let response = self
673            .http_client
674            .post(&uri)
675            .json(request)
676            .send()
677            .await
678            .map_err(|e| RPCError::HttpClient(e.to_string()))?;
679        trace!(?response, "Received response from Tycho server");
680
681        let body = response
682            .text()
683            .await
684            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
685
686        if body.is_empty() {
687            // Pure VM protocols will return empty states
688            return Ok(ProtocolStateRequestResponse {
689                states: vec![],
690                pagination: PaginationResponse {
691                    page: request.pagination.page,
692                    page_size: request.pagination.page_size,
693                    total: 0,
694                },
695            });
696        }
697
698        let states = serde_json::from_str::<ProtocolStateRequestResponse>(&body)
699            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
700        trace!(?states, "Received protocol_states response from Tycho server");
701
702        Ok(states)
703    }
704
705    async fn get_tokens(
706        &self,
707        request: &TokensRequestBody,
708    ) -> Result<TokensRequestResponse, RPCError> {
709        let uri = format!(
710            "{}/{}/tokens",
711            self.url
712                .to_string()
713                .trim_end_matches('/'),
714            TYCHO_SERVER_VERSION
715        );
716        debug!(%uri, "Sending tokens request to Tycho server");
717
718        let response = self
719            .http_client
720            .post(&uri)
721            .json(request)
722            .send()
723            .await
724            .map_err(|e| RPCError::HttpClient(e.to_string()))?;
725
726        let body = response
727            .text()
728            .await
729            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
730        let tokens = serde_json::from_str::<TokensRequestResponse>(&body)
731            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
732
733        Ok(tokens)
734    }
735
736    async fn get_protocol_systems(
737        &self,
738        request: &ProtocolSystemsRequestBody,
739    ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
740        let uri = format!(
741            "{}/{}/protocol_systems",
742            self.url
743                .to_string()
744                .trim_end_matches('/'),
745            TYCHO_SERVER_VERSION
746        );
747        debug!(%uri, "Sending protocol_systems request to Tycho server");
748        trace!(?request, "Sending request to Tycho server");
749        let response = self
750            .http_client
751            .post(&uri)
752            .json(request)
753            .send()
754            .await
755            .map_err(|e| RPCError::HttpClient(e.to_string()))?;
756        trace!(?response, "Received response from Tycho server");
757        let body = response
758            .text()
759            .await
760            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
761        let protocol_systems = serde_json::from_str::<ProtocolSystemsRequestResponse>(&body)
762            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
763        trace!(?protocol_systems, "Received protocol_systems response from Tycho server");
764        Ok(protocol_systems)
765    }
766
767    async fn get_component_tvl(
768        &self,
769        request: &ComponentTvlRequestBody,
770    ) -> Result<ComponentTvlRequestResponse, RPCError> {
771        let uri = format!(
772            "{}/{}/component_tvl",
773            self.url
774                .to_string()
775                .trim_end_matches('/'),
776            TYCHO_SERVER_VERSION
777        );
778        debug!(%uri, "Sending get_component_tvl request to Tycho server");
779        trace!(?request, "Sending request to Tycho server");
780        let response = self
781            .http_client
782            .post(&uri)
783            .json(request)
784            .send()
785            .await
786            .map_err(|e| RPCError::HttpClient(e.to_string()))?;
787        trace!(?response, "Received response from Tycho server");
788        let body = response
789            .text()
790            .await
791            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
792        let component_tvl =
793            serde_json::from_str::<ComponentTvlRequestResponse>(&body).map_err(|err| {
794                error!("Failed to parse component_tvl response: {:?}", &body);
795                RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
796            })?;
797        trace!(?component_tvl, "Received component_tvl response from Tycho server");
798        Ok(component_tvl)
799    }
800}
801
802#[cfg(test)]
803mod tests {
804    use std::{collections::HashMap, str::FromStr};
805
806    use mockito::Server;
807    use rstest::rstest;
808    // TODO: remove once deprecated ProtocolId struct is removed
809    #[allow(deprecated)]
810    use tycho_common::dto::ProtocolId;
811
812    use super::*;
813
814    // Dummy implementation of `get_protocol_states_paginated` for backwards compatibility testing
815    // purposes
816    impl MockRPCClient {
817        #[allow(clippy::too_many_arguments)]
818        async fn test_get_protocol_states_paginated<T>(
819            &self,
820            chain: Chain,
821            ids: &[T],
822            protocol_system: &str,
823            include_balances: bool,
824            version: &VersionParam,
825            chunk_size: usize,
826            _concurrency: usize,
827        ) -> Vec<ProtocolStateRequestBody>
828        where
829            T: AsRef<str> + Clone + Send + Sync + 'static,
830        {
831            ids.chunks(chunk_size)
832                .map(|chunk| ProtocolStateRequestBody {
833                    protocol_ids: Some(
834                        chunk
835                            .iter()
836                            .map(|id| id.as_ref().to_string())
837                            .collect(),
838                    ),
839                    protocol_system: protocol_system.to_string(),
840                    chain,
841                    include_balances,
842                    version: version.clone(),
843                    pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
844                })
845                .collect()
846        }
847    }
848
849    // TODO: remove once deprecated ProtocolId struct is removed
850    #[allow(deprecated)]
851    #[rstest]
852    #[case::protocol_id_input(vec![
853        ProtocolId { id: "id1".to_string(), chain: Chain::Ethereum },
854        ProtocolId { id: "id2".to_string(), chain: Chain::Ethereum }
855    ])]
856    #[case::string_input(vec![
857        "id1".to_string(),
858        "id2".to_string()
859    ])]
860    #[tokio::test]
861    async fn test_get_protocol_states_paginated_backwards_compatibility<T>(#[case] ids: Vec<T>)
862    where
863        T: AsRef<str> + Clone + Send + Sync + 'static,
864    {
865        let mock_client = MockRPCClient::new();
866
867        let request_bodies = mock_client
868            .test_get_protocol_states_paginated(
869                Chain::Ethereum,
870                &ids,
871                "test_system",
872                true,
873                &VersionParam::default(),
874                2,
875                2,
876            )
877            .await;
878
879        // Verify that the request bodies have been created correctly
880        assert_eq!(request_bodies.len(), 1);
881        assert_eq!(
882            request_bodies[0]
883                .protocol_ids
884                .as_ref()
885                .unwrap()
886                .len(),
887            2
888        );
889    }
890
891    #[tokio::test]
892    async fn test_get_contract_state() {
893        let mut server = Server::new_async().await;
894        let server_resp = r#"
895        {
896            "accounts": [
897                {
898                    "chain": "ethereum",
899                    "address": "0x0000000000000000000000000000000000000000",
900                    "title": "",
901                    "slots": {},
902                    "native_balance": "0x01f4",
903                    "token_balances": {},
904                    "code": "0x00",
905                    "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
906                    "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
907                    "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
908                    "creation_tx": null
909                }
910            ],
911            "pagination": {
912                "page": 0,
913                "page_size": 20,
914                "total": 10
915            }
916        }
917        "#;
918        // test that the response is deserialized correctly
919        serde_json::from_str::<StateRequestResponse>(server_resp).expect("deserialize");
920
921        let mocked_server = server
922            .mock("POST", "/v1/contract_state")
923            .expect(1)
924            .with_body(server_resp)
925            .create_async()
926            .await;
927
928        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
929
930        let response = client
931            .get_contract_state(&Default::default())
932            .await
933            .expect("get state");
934        let accounts = response.accounts;
935
936        mocked_server.assert();
937        assert_eq!(accounts.len(), 1);
938        assert_eq!(accounts[0].slots, HashMap::new());
939        assert_eq!(accounts[0].native_balance, Bytes::from(500u16.to_be_bytes()));
940        assert_eq!(accounts[0].code, [0].to_vec());
941        assert_eq!(
942            accounts[0].code_hash,
943            hex::decode("5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e")
944                .unwrap()
945        );
946    }
947
948    #[tokio::test]
949    async fn test_get_protocol_components() {
950        let mut server = Server::new_async().await;
951        let server_resp = r#"
952        {
953            "protocol_components": [
954                {
955                    "id": "State1",
956                    "protocol_system": "ambient",
957                    "protocol_type_name": "Pool",
958                    "chain": "ethereum",
959                    "tokens": [
960                        "0x0000000000000000000000000000000000000000",
961                        "0x0000000000000000000000000000000000000001"
962                    ],
963                    "contract_ids": [
964                        "0x0000000000000000000000000000000000000000"
965                    ],
966                    "static_attributes": {
967                        "attribute_1": "0x00000000000003e8"
968                    },
969                    "change": "Creation",
970                    "creation_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
971                    "created_at": "2022-01-01T00:00:00"
972                }
973            ],
974            "pagination": {
975                "page": 0,
976                "page_size": 20,
977                "total": 10
978            }
979        }
980        "#;
981        // test that the response is deserialized correctly
982        serde_json::from_str::<ProtocolComponentRequestResponse>(server_resp).expect("deserialize");
983
984        let mocked_server = server
985            .mock("POST", "/v1/protocol_components")
986            .expect(1)
987            .with_body(server_resp)
988            .create_async()
989            .await;
990
991        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
992
993        let response = client
994            .get_protocol_components(&Default::default())
995            .await
996            .expect("get state");
997        let components = response.protocol_components;
998
999        mocked_server.assert();
1000        assert_eq!(components.len(), 1);
1001        assert_eq!(components[0].id, "State1");
1002        assert_eq!(components[0].protocol_system, "ambient");
1003        assert_eq!(components[0].protocol_type_name, "Pool");
1004        assert_eq!(components[0].tokens.len(), 2);
1005        let expected_attributes =
1006            [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
1007                .iter()
1008                .cloned()
1009                .collect::<HashMap<String, Bytes>>();
1010        assert_eq!(components[0].static_attributes, expected_attributes);
1011    }
1012
1013    #[tokio::test]
1014    async fn test_get_protocol_states() {
1015        let mut server = Server::new_async().await;
1016        let server_resp = r#"
1017        {
1018            "states": [
1019                {
1020                    "component_id": "State1",
1021                    "attributes": {
1022                        "attribute_1": "0x00000000000003e8"
1023                    },
1024                    "balances": {
1025                        "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
1026                    }
1027                }
1028            ],
1029            "pagination": {
1030                "page": 0,
1031                "page_size": 20,
1032                "total": 10
1033            }
1034        }
1035        "#;
1036        // test that the response is deserialized correctly
1037        serde_json::from_str::<ProtocolStateRequestResponse>(server_resp).expect("deserialize");
1038
1039        let mocked_server = server
1040            .mock("POST", "/v1/protocol_state")
1041            .expect(1)
1042            .with_body(server_resp)
1043            .create_async()
1044            .await;
1045        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1046
1047        let response = client
1048            .get_protocol_states(&Default::default())
1049            .await
1050            .expect("get state");
1051        let states = response.states;
1052
1053        mocked_server.assert();
1054        assert_eq!(states.len(), 1);
1055        assert_eq!(states[0].component_id, "State1");
1056        let expected_attributes =
1057            [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
1058                .iter()
1059                .cloned()
1060                .collect::<HashMap<String, Bytes>>();
1061        assert_eq!(states[0].attributes, expected_attributes);
1062        let expected_balances = [(
1063            Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")
1064                .expect("Unsupported address format"),
1065            Bytes::from_str("0x01f4").unwrap(),
1066        )]
1067        .iter()
1068        .cloned()
1069        .collect::<HashMap<Bytes, Bytes>>();
1070        assert_eq!(states[0].balances, expected_balances);
1071    }
1072
1073    #[tokio::test]
1074    async fn test_get_tokens() {
1075        let mut server = Server::new_async().await;
1076        let server_resp = r#"
1077        {
1078            "tokens": [
1079              {
1080                "chain": "ethereum",
1081                "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
1082                "symbol": "WETH",
1083                "decimals": 18,
1084                "tax": 0,
1085                "gas": [
1086                  29962
1087                ],
1088                "quality": 100
1089              },
1090              {
1091                "chain": "ethereum",
1092                "address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
1093                "symbol": "USDC",
1094                "decimals": 6,
1095                "tax": 0,
1096                "gas": [
1097                  40652
1098                ],
1099                "quality": 100
1100              }
1101            ],
1102            "pagination": {
1103              "page": 0,
1104              "page_size": 20,
1105              "total": 10
1106            }
1107          }
1108        "#;
1109        // test that the response is deserialized correctly
1110        serde_json::from_str::<TokensRequestResponse>(server_resp).expect("deserialize");
1111
1112        let mocked_server = server
1113            .mock("POST", "/v1/tokens")
1114            .expect(1)
1115            .with_body(server_resp)
1116            .create_async()
1117            .await;
1118        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1119
1120        let response = client
1121            .get_tokens(&Default::default())
1122            .await
1123            .expect("get tokens");
1124
1125        let expected = vec![
1126            ResponseToken {
1127                chain: Chain::Ethereum,
1128                address: Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap(),
1129                symbol: "WETH".to_string(),
1130                decimals: 18,
1131                tax: 0,
1132                gas: vec![Some(29962)],
1133                quality: 100,
1134            },
1135            ResponseToken {
1136                chain: Chain::Ethereum,
1137                address: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
1138                symbol: "USDC".to_string(),
1139                decimals: 6,
1140                tax: 0,
1141                gas: vec![Some(40652)],
1142                quality: 100,
1143            },
1144        ];
1145
1146        mocked_server.assert();
1147        assert_eq!(response.tokens, expected);
1148        assert_eq!(response.pagination, PaginationResponse { page: 0, page_size: 20, total: 10 });
1149    }
1150
1151    #[tokio::test]
1152    async fn test_get_protocol_systems() {
1153        let mut server = Server::new_async().await;
1154        let server_resp = r#"
1155        {
1156            "protocol_systems": [
1157                "system1",
1158                "system2"
1159            ],
1160            "pagination": {
1161                "page": 0,
1162                "page_size": 20,
1163                "total": 10
1164            }
1165        }
1166        "#;
1167        // test that the response is deserialized correctly
1168        serde_json::from_str::<ProtocolSystemsRequestResponse>(server_resp).expect("deserialize");
1169
1170        let mocked_server = server
1171            .mock("POST", "/v1/protocol_systems")
1172            .expect(1)
1173            .with_body(server_resp)
1174            .create_async()
1175            .await;
1176        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1177
1178        let response = client
1179            .get_protocol_systems(&Default::default())
1180            .await
1181            .expect("get protocol systems");
1182        let protocol_systems = response.protocol_systems;
1183
1184        mocked_server.assert();
1185        assert_eq!(protocol_systems, vec!["system1", "system2"]);
1186    }
1187
1188    #[tokio::test]
1189    async fn test_get_component_tvl() {
1190        let mut server = Server::new_async().await;
1191        let server_resp = r#"
1192        {
1193            "tvl": {
1194                "component1": 100.0
1195            },
1196            "pagination": {
1197                "page": 0,
1198                "page_size": 20,
1199                "total": 10
1200            }
1201        }
1202        "#;
1203        // test that the response is deserialized correctly
1204        serde_json::from_str::<ComponentTvlRequestResponse>(server_resp).expect("deserialize");
1205
1206        let mocked_server = server
1207            .mock("POST", "/v1/component_tvl")
1208            .expect(1)
1209            .with_body(server_resp)
1210            .create_async()
1211            .await;
1212        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1213
1214        let response = client
1215            .get_component_tvl(&Default::default())
1216            .await
1217            .expect("get protocol systems");
1218        let component_tvl = response.tvl;
1219
1220        mocked_server.assert();
1221        assert_eq!(component_tvl.get("component1"), Some(&100.0));
1222    }
1223}