tycho_client/
rpc.rs

1//! # Tycho RPC Client
2//!
3//! The objective of this module is to provide swift and simplified access to the Remote Procedure
4//! Call (RPC) endpoints of Tycho. These endpoints are chiefly responsible for facilitating data
5//! queries, especially querying snapshots of data.
6use std::{
7    collections::HashMap,
8    sync::Arc,
9    time::{Duration, SystemTime},
10};
11
12use async_trait::async_trait;
13use backoff::{exponential::ExponentialBackoffBuilder, ExponentialBackoff};
14use futures03::future::try_join_all;
15#[cfg(test)]
16use mockall::automock;
17use reqwest::{header, Client, ClientBuilder, Response, StatusCode, Url};
18use serde::Serialize;
19use thiserror::Error;
20use time::{format_description::well_known::Rfc2822, OffsetDateTime};
21use tokio::{
22    sync::{RwLock, Semaphore},
23    time::sleep,
24};
25use tracing::{debug, error, instrument, trace, warn};
26use tycho_common::{
27    dto::{
28        BlockParam, Chain, ComponentTvlRequestBody, ComponentTvlRequestResponse,
29        EntryPointWithTracingParams, PaginationParams, PaginationResponse, ProtocolComponent,
30        ProtocolComponentRequestResponse, ProtocolComponentsRequestBody, ProtocolStateRequestBody,
31        ProtocolStateRequestResponse, ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse,
32        ResponseToken, StateRequestBody, StateRequestResponse, TokensRequestBody,
33        TokensRequestResponse, TracedEntryPointRequestBody, TracedEntryPointRequestResponse,
34        TracingResult, VersionParam,
35    },
36    models::ComponentId,
37    Bytes,
38};
39
40use crate::{
41    feed::synchronizer::{ComponentWithState, Snapshot},
42    TYCHO_SERVER_VERSION,
43};
44
45/// Request body for fetching a snapshot of protocol states and VM storage.
46///
47/// This struct helps to coordinate fetching  multiple pieces of related data
48/// (protocol states, contract storage, TVL, entry points).
49#[derive(Clone, Debug, PartialEq)]
50pub struct SnapshotParameters<'a> {
51    /// Which chain to fetch snapshots for
52    pub chain: Chain,
53    /// Protocol system name, required for correct state resolution
54    pub protocol_system: &'a str,
55    /// Components to fetch protocol states for
56    pub components: &'a HashMap<ComponentId, ProtocolComponent>,
57    /// Traced entry points data mapped by component id
58    pub entrypoints: Option<&'a HashMap<String, Vec<(EntryPointWithTracingParams, TracingResult)>>>,
59    /// Contract addresses to fetch VM storage for
60    pub contract_ids: &'a [Bytes],
61    /// Block number for versioning
62    pub block_number: u64,
63    /// Whether to include balance information
64    pub include_balances: bool,
65    /// Whether to fetch TVL data
66    pub include_tvl: bool,
67}
68
69impl<'a> SnapshotParameters<'a> {
70    pub fn new(
71        chain: Chain,
72        protocol_system: &'a str,
73        components: &'a HashMap<ComponentId, ProtocolComponent>,
74        contract_ids: &'a [Bytes],
75        block_number: u64,
76    ) -> Self {
77        Self {
78            chain,
79            protocol_system,
80            components,
81            entrypoints: None,
82            contract_ids,
83            block_number,
84            include_balances: true,
85            include_tvl: true,
86        }
87    }
88
89    /// Set whether to include balance information (default: true)
90    pub fn include_balances(mut self, include_balances: bool) -> Self {
91        self.include_balances = include_balances;
92        self
93    }
94
95    /// Set whether to fetch TVL data (default: true)
96    pub fn include_tvl(mut self, include_tvl: bool) -> Self {
97        self.include_tvl = include_tvl;
98        self
99    }
100
101    pub fn entrypoints(
102        mut self,
103        entrypoints: &'a HashMap<String, Vec<(EntryPointWithTracingParams, TracingResult)>>,
104    ) -> Self {
105        self.entrypoints = Some(entrypoints);
106        self
107    }
108}
109
110#[derive(Error, Debug)]
111pub enum RPCError {
112    /// The passed tycho url failed to parse.
113    #[error("Failed to parse URL: {0}. Error: {1}")]
114    UrlParsing(String, String),
115
116    /// The request data is not correctly formed.
117    #[error("Failed to format request: {0}")]
118    FormatRequest(String),
119
120    /// Errors forwarded from the HTTP protocol.
121    #[error("Unexpected HTTP client error: {0}")]
122    HttpClient(String, #[source] reqwest::Error),
123
124    /// The response from the server could not be parsed correctly.
125    #[error("Failed to parse response: {0}")]
126    ParseResponse(String),
127
128    /// Other fatal errors.
129    #[error("Fatal error: {0}")]
130    Fatal(String),
131
132    #[error("Rate limited until {0:?}")]
133    RateLimited(Option<SystemTime>),
134
135    #[error("Server unreachable: {0}")]
136    ServerUnreachable(String),
137}
138
139#[cfg_attr(test, automock)]
140#[async_trait]
141pub trait RPCClient: Send + Sync {
142    /// Retrieves a snapshot of contract state.
143    async fn get_contract_state(
144        &self,
145        request: &StateRequestBody,
146    ) -> Result<StateRequestResponse, RPCError>;
147
148    async fn get_contract_state_paginated(
149        &self,
150        chain: Chain,
151        ids: &[Bytes],
152        protocol_system: &str,
153        version: &VersionParam,
154        chunk_size: usize,
155        concurrency: usize,
156    ) -> Result<StateRequestResponse, RPCError> {
157        let semaphore = Arc::new(Semaphore::new(concurrency));
158
159        // Sort the ids to maximize server-side cache hits
160        let mut sorted_ids = ids.to_vec();
161        sorted_ids.sort();
162
163        let chunked_bodies = sorted_ids
164            .chunks(chunk_size)
165            .map(|chunk| StateRequestBody {
166                contract_ids: Some(chunk.to_vec()),
167                protocol_system: protocol_system.to_string(),
168                chain,
169                version: version.clone(),
170                pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
171            })
172            .collect::<Vec<_>>();
173
174        let mut tasks = Vec::new();
175        for body in chunked_bodies.iter() {
176            let sem = semaphore.clone();
177            tasks.push(async move {
178                let _permit = sem
179                    .acquire()
180                    .await
181                    .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
182                self.get_contract_state(body).await
183            });
184        }
185
186        // Execute all tasks concurrently with the defined concurrency limit.
187        let responses = try_join_all(tasks).await?;
188
189        // Aggregate the responses into a single result.
190        let accounts = responses
191            .iter()
192            .flat_map(|r| r.accounts.clone())
193            .collect();
194        let total: i64 = responses
195            .iter()
196            .map(|r| r.pagination.total)
197            .sum();
198
199        Ok(StateRequestResponse {
200            accounts,
201            pagination: PaginationResponse { page: 0, page_size: chunk_size as i64, total },
202        })
203    }
204
205    async fn get_protocol_components(
206        &self,
207        request: &ProtocolComponentsRequestBody,
208    ) -> Result<ProtocolComponentRequestResponse, RPCError>;
209
210    async fn get_protocol_components_paginated(
211        &self,
212        request: &ProtocolComponentsRequestBody,
213        chunk_size: usize,
214        concurrency: usize,
215    ) -> Result<ProtocolComponentRequestResponse, RPCError> {
216        let semaphore = Arc::new(Semaphore::new(concurrency));
217
218        // If a set of component IDs is specified, the maximum return size is already known,
219        // allowing us to pre-compute the number of requests to be made.
220        match request.component_ids {
221            Some(ref ids) => {
222                // We can divide the component_ids into chunks of size chunk_size
223                let chunked_bodies = ids
224                    .chunks(chunk_size)
225                    .enumerate()
226                    .map(|(index, _)| ProtocolComponentsRequestBody {
227                        protocol_system: request.protocol_system.clone(),
228                        component_ids: request.component_ids.clone(),
229                        tvl_gt: request.tvl_gt,
230                        chain: request.chain,
231                        pagination: PaginationParams {
232                            page: index as i64,
233                            page_size: chunk_size as i64,
234                        },
235                    })
236                    .collect::<Vec<_>>();
237
238                let mut tasks = Vec::new();
239                for body in chunked_bodies.iter() {
240                    let sem = semaphore.clone();
241                    tasks.push(async move {
242                        let _permit = sem
243                            .acquire()
244                            .await
245                            .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
246                        self.get_protocol_components(body).await
247                    });
248                }
249
250                try_join_all(tasks)
251                    .await
252                    .map(|responses| ProtocolComponentRequestResponse {
253                        protocol_components: responses
254                            .into_iter()
255                            .flat_map(|r| r.protocol_components.into_iter())
256                            .collect(),
257                        pagination: PaginationResponse {
258                            page: 0,
259                            page_size: chunk_size as i64,
260                            total: ids.len() as i64,
261                        },
262                    })
263            }
264            _ => {
265                // If no component ids are specified, we need to make requests based on the total
266                // number of results from the first response.
267
268                let initial_request = ProtocolComponentsRequestBody {
269                    protocol_system: request.protocol_system.clone(),
270                    component_ids: request.component_ids.clone(),
271                    tvl_gt: request.tvl_gt,
272                    chain: request.chain,
273                    pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
274                };
275                let first_response = self
276                    .get_protocol_components(&initial_request)
277                    .await
278                    .map_err(|err| RPCError::Fatal(err.to_string()))?;
279
280                let total_items = first_response.pagination.total;
281                let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
282
283                // Initialize the final response accumulator
284                let mut accumulated_response = ProtocolComponentRequestResponse {
285                    protocol_components: first_response.protocol_components,
286                    pagination: PaginationResponse {
287                        page: 0,
288                        page_size: chunk_size as i64,
289                        total: total_items,
290                    },
291                };
292
293                let mut page = 1;
294                while page < total_pages {
295                    let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
296
297                    // Create request bodies for parallel requests, respecting the concurrency limit
298                    let chunked_bodies = (0..requests_in_this_iteration)
299                        .map(|iter| ProtocolComponentsRequestBody {
300                            protocol_system: request.protocol_system.clone(),
301                            component_ids: request.component_ids.clone(),
302                            tvl_gt: request.tvl_gt,
303                            chain: request.chain,
304                            pagination: PaginationParams {
305                                page: page + iter,
306                                page_size: chunk_size as i64,
307                            },
308                        })
309                        .collect::<Vec<_>>();
310
311                    let tasks: Vec<_> = chunked_bodies
312                        .iter()
313                        .map(|body| {
314                            let sem = semaphore.clone();
315                            async move {
316                                let _permit = sem.acquire().await.map_err(|_| {
317                                    RPCError::Fatal("Semaphore dropped".to_string())
318                                })?;
319                                self.get_protocol_components(body).await
320                            }
321                        })
322                        .collect();
323
324                    let responses = try_join_all(tasks)
325                        .await
326                        .map(|responses| {
327                            let total = responses[0].pagination.total;
328                            ProtocolComponentRequestResponse {
329                                protocol_components: responses
330                                    .into_iter()
331                                    .flat_map(|r| r.protocol_components.into_iter())
332                                    .collect(),
333                                pagination: PaginationResponse {
334                                    page,
335                                    page_size: chunk_size as i64,
336                                    total,
337                                },
338                            }
339                        });
340
341                    // Update the accumulated response or set the initial response
342                    match responses {
343                        Ok(mut resp) => {
344                            accumulated_response
345                                .protocol_components
346                                .append(&mut resp.protocol_components);
347                        }
348                        Err(e) => return Err(e),
349                    }
350
351                    page += concurrency as i64;
352                }
353                Ok(accumulated_response)
354            }
355        }
356    }
357
358    async fn get_protocol_states(
359        &self,
360        request: &ProtocolStateRequestBody,
361    ) -> Result<ProtocolStateRequestResponse, RPCError>;
362
363    #[allow(clippy::too_many_arguments)]
364    async fn get_protocol_states_paginated<T>(
365        &self,
366        chain: Chain,
367        ids: &[T],
368        protocol_system: &str,
369        include_balances: bool,
370        version: &VersionParam,
371        chunk_size: usize,
372        concurrency: usize,
373    ) -> Result<ProtocolStateRequestResponse, RPCError>
374    where
375        T: AsRef<str> + Sync + 'static,
376    {
377        let semaphore = Arc::new(Semaphore::new(concurrency));
378        let chunked_bodies = ids
379            .chunks(chunk_size)
380            .map(|c| ProtocolStateRequestBody {
381                protocol_ids: Some(
382                    c.iter()
383                        .map(|id| id.as_ref().to_string())
384                        .collect(),
385                ),
386                protocol_system: protocol_system.to_string(),
387                chain,
388                include_balances,
389                version: version.clone(),
390                pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
391            })
392            .collect::<Vec<_>>();
393
394        let mut tasks = Vec::new();
395        for body in chunked_bodies.iter() {
396            let sem = semaphore.clone();
397            tasks.push(async move {
398                let _permit = sem
399                    .acquire()
400                    .await
401                    .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
402                self.get_protocol_states(body).await
403            });
404        }
405
406        try_join_all(tasks)
407            .await
408            .map(|responses| {
409                let states = responses
410                    .clone()
411                    .into_iter()
412                    .flat_map(|r| r.states)
413                    .collect();
414                let total = responses
415                    .iter()
416                    .map(|r| r.pagination.total)
417                    .sum();
418                ProtocolStateRequestResponse {
419                    states,
420                    pagination: PaginationResponse { page: 0, page_size: chunk_size as i64, total },
421                }
422            })
423    }
424
425    /// This function returns only one chunk of tokens. To get all tokens please call
426    /// get_all_tokens.
427    async fn get_tokens(
428        &self,
429        request: &TokensRequestBody,
430    ) -> Result<TokensRequestResponse, RPCError>;
431
432    async fn get_all_tokens(
433        &self,
434        chain: Chain,
435        min_quality: Option<i32>,
436        traded_n_days_ago: Option<u64>,
437        chunk_size: usize,
438    ) -> Result<Vec<ResponseToken>, RPCError> {
439        let mut request_page = 0;
440        let mut all_tokens = Vec::new();
441        loop {
442            let mut response = self
443                .get_tokens(&TokensRequestBody {
444                    token_addresses: None,
445                    min_quality,
446                    traded_n_days_ago,
447                    pagination: PaginationParams {
448                        page: request_page,
449                        page_size: chunk_size.try_into().map_err(|_| {
450                            RPCError::FormatRequest(
451                                "Failed to convert chunk_size into i64".to_string(),
452                            )
453                        })?,
454                    },
455                    chain,
456                })
457                .await?;
458
459            let num_tokens = response.tokens.len();
460            all_tokens.append(&mut response.tokens);
461            request_page += 1;
462
463            if num_tokens < chunk_size {
464                break;
465            }
466        }
467        Ok(all_tokens)
468    }
469
470    async fn get_protocol_systems(
471        &self,
472        request: &ProtocolSystemsRequestBody,
473    ) -> Result<ProtocolSystemsRequestResponse, RPCError>;
474
475    async fn get_component_tvl(
476        &self,
477        request: &ComponentTvlRequestBody,
478    ) -> Result<ComponentTvlRequestResponse, RPCError>;
479
480    async fn get_component_tvl_paginated(
481        &self,
482        request: &ComponentTvlRequestBody,
483        chunk_size: usize,
484        concurrency: usize,
485    ) -> Result<ComponentTvlRequestResponse, RPCError> {
486        let semaphore = Arc::new(Semaphore::new(concurrency));
487
488        match request.component_ids {
489            Some(ref ids) => {
490                let chunked_requests = ids
491                    .chunks(chunk_size)
492                    .enumerate()
493                    .map(|(index, _)| ComponentTvlRequestBody {
494                        chain: request.chain,
495                        protocol_system: request.protocol_system.clone(),
496                        component_ids: Some(ids.clone()),
497                        pagination: PaginationParams {
498                            page: index as i64,
499                            page_size: chunk_size as i64,
500                        },
501                    })
502                    .collect::<Vec<_>>();
503
504                let tasks: Vec<_> = chunked_requests
505                    .into_iter()
506                    .map(|req| {
507                        let sem = semaphore.clone();
508                        async move {
509                            let _permit = sem
510                                .acquire()
511                                .await
512                                .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
513                            self.get_component_tvl(&req).await
514                        }
515                    })
516                    .collect();
517
518                let responses = try_join_all(tasks).await?;
519
520                let mut merged_tvl = HashMap::new();
521                for resp in responses {
522                    for (key, value) in resp.tvl {
523                        *merged_tvl.entry(key).or_insert(0.0) = value;
524                    }
525                }
526
527                Ok(ComponentTvlRequestResponse {
528                    tvl: merged_tvl,
529                    pagination: PaginationResponse {
530                        page: 0,
531                        page_size: chunk_size as i64,
532                        total: ids.len() as i64,
533                    },
534                })
535            }
536            _ => {
537                let first_request = ComponentTvlRequestBody {
538                    chain: request.chain,
539                    protocol_system: request.protocol_system.clone(),
540                    component_ids: request.component_ids.clone(),
541                    pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
542                };
543
544                let first_response = self
545                    .get_component_tvl(&first_request)
546                    .await?;
547                let total_items = first_response.pagination.total;
548                let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
549
550                let mut merged_tvl = first_response.tvl;
551
552                let mut page = 1;
553                while page < total_pages {
554                    let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
555
556                    let chunked_requests: Vec<_> = (0..requests_in_this_iteration)
557                        .map(|i| ComponentTvlRequestBody {
558                            chain: request.chain,
559                            protocol_system: request.protocol_system.clone(),
560                            component_ids: request.component_ids.clone(),
561                            pagination: PaginationParams {
562                                page: page + i,
563                                page_size: chunk_size as i64,
564                            },
565                        })
566                        .collect();
567
568                    let tasks: Vec<_> = chunked_requests
569                        .into_iter()
570                        .map(|req| {
571                            let sem = semaphore.clone();
572                            async move {
573                                let _permit = sem.acquire().await.map_err(|_| {
574                                    RPCError::Fatal("Semaphore dropped".to_string())
575                                })?;
576                                self.get_component_tvl(&req).await
577                            }
578                        })
579                        .collect();
580
581                    let responses = try_join_all(tasks).await?;
582
583                    // merge hashmap
584                    for resp in responses {
585                        for (key, value) in resp.tvl {
586                            *merged_tvl.entry(key).or_insert(0.0) += value;
587                        }
588                    }
589
590                    page += concurrency as i64;
591                }
592
593                Ok(ComponentTvlRequestResponse {
594                    tvl: merged_tvl,
595                    pagination: PaginationResponse {
596                        page: 0,
597                        page_size: chunk_size as i64,
598                        total: total_items,
599                    },
600                })
601            }
602        }
603    }
604
605    async fn get_traced_entry_points(
606        &self,
607        request: &TracedEntryPointRequestBody,
608    ) -> Result<TracedEntryPointRequestResponse, RPCError>;
609
610    async fn get_traced_entry_points_paginated(
611        &self,
612        chain: Chain,
613        protocol_system: &str,
614        component_ids: &[String],
615        chunk_size: usize,
616        concurrency: usize,
617    ) -> Result<TracedEntryPointRequestResponse, RPCError> {
618        let semaphore = Arc::new(Semaphore::new(concurrency));
619        let chunked_bodies = component_ids
620            .chunks(chunk_size)
621            .map(|c| TracedEntryPointRequestBody {
622                chain,
623                protocol_system: protocol_system.to_string(),
624                component_ids: Some(c.to_vec()),
625                pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
626            })
627            .collect::<Vec<_>>();
628
629        let mut tasks = Vec::new();
630        for body in chunked_bodies.iter() {
631            let sem = semaphore.clone();
632            tasks.push(async move {
633                let _permit = sem
634                    .acquire()
635                    .await
636                    .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
637                self.get_traced_entry_points(body).await
638            });
639        }
640
641        try_join_all(tasks)
642            .await
643            .map(|responses| {
644                let traced_entry_points = responses
645                    .clone()
646                    .into_iter()
647                    .flat_map(|r| r.traced_entry_points)
648                    .collect();
649                let total = responses
650                    .iter()
651                    .map(|r| r.pagination.total)
652                    .sum();
653                TracedEntryPointRequestResponse {
654                    traced_entry_points,
655                    pagination: PaginationResponse { page: 0, page_size: chunk_size as i64, total },
656                }
657            })
658    }
659
660    async fn get_snapshots<'a>(
661        &self,
662        request: &SnapshotParameters<'a>,
663        chunk_size: usize,
664        concurrency: usize,
665    ) -> Result<Snapshot, RPCError>;
666}
667
668#[derive(Debug, Clone)]
669pub struct HttpRPCClient {
670    http_client: Client,
671    url: Url,
672    retry_after: Arc<RwLock<Option<SystemTime>>>,
673    backoff_policy: ExponentialBackoff,
674    server_restart_duration: Duration,
675}
676
677impl HttpRPCClient {
678    pub fn new(base_uri: &str, auth_key: Option<&str>) -> Result<Self, RPCError> {
679        let uri = base_uri
680            .parse::<Url>()
681            .map_err(|e| RPCError::UrlParsing(base_uri.to_string(), e.to_string()))?;
682
683        // Add default headers
684        let mut headers = header::HeaderMap::new();
685        headers.insert(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"));
686        let user_agent = format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION"));
687        headers.insert(
688            header::USER_AGENT,
689            header::HeaderValue::from_str(&user_agent)
690                .map_err(|e| RPCError::FormatRequest(format!("Invalid user agent format: {e}")))?,
691        );
692
693        // Add Authorization if one is given
694        if let Some(key) = auth_key {
695            let mut auth_value = header::HeaderValue::from_str(key).map_err(|e| {
696                RPCError::FormatRequest(format!("Invalid authorization key format: {e}"))
697            })?;
698            auth_value.set_sensitive(true);
699            headers.insert(header::AUTHORIZATION, auth_value);
700        }
701
702        let client = ClientBuilder::new()
703            .default_headers(headers)
704            .http2_prior_knowledge()
705            .build()
706            .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
707        Ok(Self {
708            http_client: client,
709            url: uri,
710            retry_after: Arc::new(RwLock::new(None)),
711            backoff_policy: ExponentialBackoffBuilder::new()
712                .with_initial_interval(Duration::from_millis(250))
713                // increase backoff time by 75% each failure
714                .with_multiplier(1.75)
715                // keep retrying every 30s
716                .with_max_interval(Duration::from_secs(30))
717                // if all retries take longer than 2m, give up
718                .with_max_elapsed_time(Some(Duration::from_secs(125)))
719                .build(),
720            server_restart_duration: Duration::from_secs(120),
721        })
722    }
723
724    #[cfg(test)]
725    pub fn with_test_backoff_policy(mut self) -> Self {
726        // Extremely short intervals for very fast testing
727        self.backoff_policy = ExponentialBackoffBuilder::new()
728            .with_initial_interval(Duration::from_millis(1))
729            .with_multiplier(1.1)
730            .with_max_interval(Duration::from_millis(5))
731            .with_max_elapsed_time(Some(Duration::from_millis(50)))
732            .build();
733        self.server_restart_duration = Duration::from_millis(50);
734        self
735    }
736
737    /// Converts a error response to a Result.
738    ///
739    /// Raises an error if the response status code id 429, 502, 503 or 504. In the 429
740    /// case it will try to look for a retry-after header an parse it accordingly. The
741    /// parsed value is then passed as part of the error.
742    async fn error_for_response(
743        &self,
744        response: reqwest::Response,
745    ) -> Result<reqwest::Response, RPCError> {
746        match response.status() {
747            StatusCode::TOO_MANY_REQUESTS => {
748                let retry_after_raw = response
749                    .headers()
750                    .get(reqwest::header::RETRY_AFTER)
751                    .and_then(|h| h.to_str().ok())
752                    .and_then(parse_retry_value);
753
754                Err(RPCError::RateLimited(retry_after_raw))
755            }
756            StatusCode::BAD_GATEWAY |
757            StatusCode::SERVICE_UNAVAILABLE |
758            StatusCode::GATEWAY_TIMEOUT => Err(RPCError::ServerUnreachable(
759                response
760                    .text()
761                    .await
762                    .unwrap_or_else(|_| "Server Unreachable".to_string()),
763            )),
764            _ => Ok(response),
765        }
766    }
767
768    /// Classifies errors into transient or permanent ones.
769    ///
770    /// Transient errors are retried with a potential backoff, permanent ones are not.
771    /// If the error is RateLimited, this method will set the self.retry_after value so
772    /// future requests wait until the rate limit has been reset.
773    async fn handle_error_for_backoff(&self, e: RPCError) -> backoff::Error<RPCError> {
774        match e {
775            RPCError::ServerUnreachable(_) => {
776                backoff::Error::retry_after(e, self.server_restart_duration)
777            }
778            RPCError::RateLimited(Some(until)) => {
779                let mut retry_after_guard = self.retry_after.write().await;
780                *retry_after_guard = Some(
781                    retry_after_guard
782                        .unwrap_or(until)
783                        .max(until),
784                );
785
786                if let Ok(duration) = until.duration_since(SystemTime::now()) {
787                    backoff::Error::retry_after(e, duration)
788                } else {
789                    e.into()
790                }
791            }
792            RPCError::RateLimited(None) => e.into(),
793            _ => backoff::Error::permanent(e),
794        }
795    }
796
797    /// Waits until the current rate limit time has passed.
798    ///
799    /// Only waits if there is a time and that time is in the future, else return
800    /// immediately.
801    async fn wait_until_retry_after(&self) {
802        if let Some(&until) = self.retry_after.read().await.as_ref() {
803            let now = SystemTime::now();
804            if until > now {
805                if let Ok(duration) = until.duration_since(now) {
806                    sleep(duration).await
807                }
808            }
809        }
810    }
811
812    /// Makes a post request handling transient failures.
813    ///
814    /// If a retry-after header is received it will be respected. Else the configured
815    /// backoff policy is used to deal with transient network or server errors.
816    async fn make_post_request<T: Serialize + ?Sized>(
817        &self,
818        request: &T,
819        uri: &String,
820    ) -> Result<Response, RPCError> {
821        self.wait_until_retry_after().await;
822        let response = backoff::future::retry(self.backoff_policy.clone(), || async {
823            let server_response = self
824                .http_client
825                .post(uri)
826                .json(request)
827                .send()
828                .await
829                .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
830
831            match self
832                .error_for_response(server_response)
833                .await
834            {
835                Ok(response) => Ok(response),
836                Err(e) => Err(self.handle_error_for_backoff(e).await),
837            }
838        })
839        .await?;
840        Ok(response)
841    }
842}
843
844fn parse_retry_value(val: &str) -> Option<SystemTime> {
845    if let Ok(secs) = val.parse::<u64>() {
846        return Some(SystemTime::now() + Duration::from_secs(secs));
847    }
848    if let Ok(date) = OffsetDateTime::parse(val, &Rfc2822) {
849        return Some(date.into());
850    }
851    None
852}
853
854#[async_trait]
855impl RPCClient for HttpRPCClient {
856    #[instrument(skip(self, request))]
857    async fn get_contract_state(
858        &self,
859        request: &StateRequestBody,
860    ) -> Result<StateRequestResponse, RPCError> {
861        // Check if contract ids are specified
862        if request
863            .contract_ids
864            .as_ref()
865            .is_none_or(|ids| ids.is_empty())
866        {
867            warn!("No contract ids specified in request.");
868        }
869
870        let uri = format!(
871            "{}/{}/contract_state",
872            self.url
873                .to_string()
874                .trim_end_matches('/'),
875            TYCHO_SERVER_VERSION
876        );
877        debug!(%uri, "Sending contract_state request to Tycho server");
878        trace!(?request, "Sending request to Tycho server");
879        let response = self
880            .make_post_request(request, &uri)
881            .await?;
882        trace!(?response, "Received response from Tycho server");
883
884        let body = response
885            .text()
886            .await
887            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
888        if body.is_empty() {
889            // Pure native protocols will return empty contract states
890            return Ok(StateRequestResponse {
891                accounts: vec![],
892                pagination: PaginationResponse {
893                    page: request.pagination.page,
894                    page_size: request.pagination.page,
895                    total: 0,
896                },
897            });
898        }
899
900        let accounts = serde_json::from_str::<StateRequestResponse>(&body)
901            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
902        trace!(?accounts, "Received contract_state response from Tycho server");
903
904        Ok(accounts)
905    }
906
907    async fn get_protocol_components(
908        &self,
909        request: &ProtocolComponentsRequestBody,
910    ) -> Result<ProtocolComponentRequestResponse, RPCError> {
911        let uri = format!(
912            "{}/{}/protocol_components",
913            self.url
914                .to_string()
915                .trim_end_matches('/'),
916            TYCHO_SERVER_VERSION,
917        );
918        debug!(%uri, "Sending protocol_components request to Tycho server");
919        trace!(?request, "Sending request to Tycho server");
920
921        let response = self
922            .make_post_request(request, &uri)
923            .await?;
924
925        trace!(?response, "Received response from Tycho server");
926
927        let body = response
928            .text()
929            .await
930            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
931        let components = serde_json::from_str::<ProtocolComponentRequestResponse>(&body)
932            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
933        trace!(?components, "Received protocol_components response from Tycho server");
934
935        Ok(components)
936    }
937
938    async fn get_protocol_states(
939        &self,
940        request: &ProtocolStateRequestBody,
941    ) -> Result<ProtocolStateRequestResponse, RPCError> {
942        // Check if protocol ids are specified
943        if request
944            .protocol_ids
945            .as_ref()
946            .is_none_or(|ids| ids.is_empty())
947        {
948            warn!("No protocol ids specified in request.");
949        }
950
951        let uri = format!(
952            "{}/{}/protocol_state",
953            self.url
954                .to_string()
955                .trim_end_matches('/'),
956            TYCHO_SERVER_VERSION
957        );
958        debug!(%uri, "Sending protocol_states request to Tycho server");
959        trace!(?request, "Sending request to Tycho server");
960
961        let response = self
962            .make_post_request(request, &uri)
963            .await?;
964        trace!(?response, "Received response from Tycho server");
965
966        let body = response
967            .text()
968            .await
969            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
970
971        if body.is_empty() {
972            // Pure VM protocols will return empty states
973            return Ok(ProtocolStateRequestResponse {
974                states: vec![],
975                pagination: PaginationResponse {
976                    page: request.pagination.page,
977                    page_size: request.pagination.page_size,
978                    total: 0,
979                },
980            });
981        }
982
983        let states = serde_json::from_str::<ProtocolStateRequestResponse>(&body)
984            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
985        trace!(?states, "Received protocol_states response from Tycho server");
986
987        Ok(states)
988    }
989
990    async fn get_tokens(
991        &self,
992        request: &TokensRequestBody,
993    ) -> Result<TokensRequestResponse, RPCError> {
994        let uri = format!(
995            "{}/{}/tokens",
996            self.url
997                .to_string()
998                .trim_end_matches('/'),
999            TYCHO_SERVER_VERSION
1000        );
1001        debug!(%uri, "Sending tokens request to Tycho server");
1002
1003        let response = self
1004            .make_post_request(request, &uri)
1005            .await?;
1006
1007        let body = response
1008            .text()
1009            .await
1010            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1011        let tokens = serde_json::from_str::<TokensRequestResponse>(&body)
1012            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1013
1014        Ok(tokens)
1015    }
1016
1017    async fn get_protocol_systems(
1018        &self,
1019        request: &ProtocolSystemsRequestBody,
1020    ) -> Result<ProtocolSystemsRequestResponse, RPCError> {
1021        let uri = format!(
1022            "{}/{}/protocol_systems",
1023            self.url
1024                .to_string()
1025                .trim_end_matches('/'),
1026            TYCHO_SERVER_VERSION
1027        );
1028        debug!(%uri, "Sending protocol_systems request to Tycho server");
1029        trace!(?request, "Sending request to Tycho server");
1030        let response = self
1031            .make_post_request(request, &uri)
1032            .await?;
1033        trace!(?response, "Received response from Tycho server");
1034        let body = response
1035            .text()
1036            .await
1037            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1038        let protocol_systems = serde_json::from_str::<ProtocolSystemsRequestResponse>(&body)
1039            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1040        trace!(?protocol_systems, "Received protocol_systems response from Tycho server");
1041        Ok(protocol_systems)
1042    }
1043
1044    async fn get_component_tvl(
1045        &self,
1046        request: &ComponentTvlRequestBody,
1047    ) -> Result<ComponentTvlRequestResponse, RPCError> {
1048        let uri = format!(
1049            "{}/{}/component_tvl",
1050            self.url
1051                .to_string()
1052                .trim_end_matches('/'),
1053            TYCHO_SERVER_VERSION
1054        );
1055        debug!(%uri, "Sending get_component_tvl request to Tycho server");
1056        trace!(?request, "Sending request to Tycho server");
1057        let response = self
1058            .make_post_request(request, &uri)
1059            .await?;
1060        trace!(?response, "Received response from Tycho server");
1061        let body = response
1062            .text()
1063            .await
1064            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1065        let component_tvl =
1066            serde_json::from_str::<ComponentTvlRequestResponse>(&body).map_err(|err| {
1067                error!("Failed to parse component_tvl response: {:?}", &body);
1068                RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1069            })?;
1070        trace!(?component_tvl, "Received component_tvl response from Tycho server");
1071        Ok(component_tvl)
1072    }
1073
1074    async fn get_traced_entry_points(
1075        &self,
1076        request: &TracedEntryPointRequestBody,
1077    ) -> Result<TracedEntryPointRequestResponse, RPCError> {
1078        let uri = format!(
1079            "{}/{TYCHO_SERVER_VERSION}/traced_entry_points",
1080            self.url
1081                .to_string()
1082                .trim_end_matches('/')
1083        );
1084        debug!(%uri, "Sending traced_entry_points request to Tycho server");
1085        trace!(?request, "Sending request to Tycho server");
1086
1087        let response = self
1088            .make_post_request(request, &uri)
1089            .await?;
1090
1091        trace!(?response, "Received response from Tycho server");
1092
1093        let body = response
1094            .text()
1095            .await
1096            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1097        let entrypoints =
1098            serde_json::from_str::<TracedEntryPointRequestResponse>(&body).map_err(|err| {
1099                error!("Failed to parse traced_entry_points response: {:?}", &body);
1100                RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1101            })?;
1102        trace!(?entrypoints, "Received traced_entry_points response from Tycho server");
1103        Ok(entrypoints)
1104    }
1105
1106    async fn get_snapshots<'a>(
1107        &self,
1108        request: &SnapshotParameters<'a>,
1109        chunk_size: usize,
1110        concurrency: usize,
1111    ) -> Result<Snapshot, RPCError> {
1112        let component_ids: Vec<_> = request
1113            .components
1114            .keys()
1115            .cloned()
1116            .collect();
1117
1118        let version = VersionParam::new(
1119            None,
1120            Some({
1121                #[allow(deprecated)]
1122                BlockParam {
1123                    hash: None,
1124                    chain: Some(request.chain),
1125                    number: Some(request.block_number as i64),
1126                }
1127            }),
1128        );
1129
1130        let component_tvl = if request.include_tvl && !component_ids.is_empty() {
1131            let body = ComponentTvlRequestBody::id_filtered(component_ids.clone(), request.chain);
1132            self.get_component_tvl_paginated(&body, chunk_size, concurrency)
1133                .await?
1134                .tvl
1135        } else {
1136            HashMap::new()
1137        };
1138
1139        let mut protocol_states = if !component_ids.is_empty() {
1140            self.get_protocol_states_paginated(
1141                request.chain,
1142                &component_ids,
1143                request.protocol_system,
1144                request.include_balances,
1145                &version,
1146                chunk_size,
1147                concurrency,
1148            )
1149            .await?
1150            .states
1151            .into_iter()
1152            .map(|state| (state.component_id.clone(), state))
1153            .collect()
1154        } else {
1155            HashMap::new()
1156        };
1157
1158        // Convert to ComponentWithState, which includes entrypoint information.
1159        let states = request
1160            .components
1161            .values()
1162            .filter_map(|component| {
1163                if let Some(state) = protocol_states.remove(&component.id) {
1164                    Some((
1165                        component.id.clone(),
1166                        ComponentWithState {
1167                            state,
1168                            component: component.clone(),
1169                            component_tvl: component_tvl
1170                                .get(&component.id)
1171                                .cloned(),
1172                            entrypoints: request
1173                                .entrypoints
1174                                .as_ref()
1175                                .and_then(|map| map.get(&component.id))
1176                                .cloned()
1177                                .unwrap_or_default(),
1178                        },
1179                    ))
1180                } else if component_ids.contains(&component.id) {
1181                    // only emit error event if we requested this component
1182                    let component_id = &component.id;
1183                    error!(?component_id, "Missing state for native component!");
1184                    None
1185                } else {
1186                    None
1187                }
1188            })
1189            .collect();
1190
1191        let vm_storage = if !request.contract_ids.is_empty() {
1192            let contract_states = self
1193                .get_contract_state_paginated(
1194                    request.chain,
1195                    request.contract_ids,
1196                    request.protocol_system,
1197                    &version,
1198                    chunk_size,
1199                    concurrency,
1200                )
1201                .await?
1202                .accounts
1203                .into_iter()
1204                .map(|acc| (acc.address.clone(), acc))
1205                .collect::<HashMap<_, _>>();
1206
1207            trace!(states=?&contract_states, "Retrieved ContractState");
1208
1209            let contract_address_to_components = request
1210                .components
1211                .iter()
1212                .filter_map(|(id, comp)| {
1213                    if component_ids.contains(id) {
1214                        Some(
1215                            comp.contract_ids
1216                                .iter()
1217                                .map(|address| (address.clone(), comp.id.clone())),
1218                        )
1219                    } else {
1220                        None
1221                    }
1222                })
1223                .flatten()
1224                .fold(HashMap::<Bytes, Vec<String>>::new(), |mut acc, (addr, c_id)| {
1225                    acc.entry(addr).or_default().push(c_id);
1226                    acc
1227                });
1228
1229            request
1230                .contract_ids
1231                .iter()
1232                .filter_map(|address| {
1233                    if let Some(state) = contract_states.get(address) {
1234                        Some((address.clone(), state.clone()))
1235                    } else if let Some(ids) = contract_address_to_components.get(address) {
1236                        // only emit error even if we did actually request this address
1237                        error!(
1238                            ?address,
1239                            ?ids,
1240                            "Component with lacking contract storage encountered!"
1241                        );
1242                        None
1243                    } else {
1244                        None
1245                    }
1246                })
1247                .collect()
1248        } else {
1249            HashMap::new()
1250        };
1251
1252        Ok(Snapshot { states, vm_storage })
1253    }
1254}
1255
1256#[cfg(test)]
1257mod tests {
1258    use std::{
1259        collections::{HashMap, HashSet},
1260        str::FromStr,
1261    };
1262
1263    use mockito::Server;
1264    use rstest::rstest;
1265    // TODO: remove once deprecated ProtocolId struct is removed
1266    #[allow(deprecated)]
1267    use tycho_common::dto::ProtocolId;
1268    use tycho_common::dto::{AddressStorageLocation, TracingParams};
1269
1270    use super::*;
1271
1272    // Dummy implementation of `get_protocol_states_paginated` for backwards compatibility testing
1273    // purposes
1274    impl MockRPCClient {
1275        #[allow(clippy::too_many_arguments)]
1276        async fn test_get_protocol_states_paginated<T>(
1277            &self,
1278            chain: Chain,
1279            ids: &[T],
1280            protocol_system: &str,
1281            include_balances: bool,
1282            version: &VersionParam,
1283            chunk_size: usize,
1284            _concurrency: usize,
1285        ) -> Vec<ProtocolStateRequestBody>
1286        where
1287            T: AsRef<str> + Clone + Send + Sync + 'static,
1288        {
1289            ids.chunks(chunk_size)
1290                .map(|chunk| ProtocolStateRequestBody {
1291                    protocol_ids: Some(
1292                        chunk
1293                            .iter()
1294                            .map(|id| id.as_ref().to_string())
1295                            .collect(),
1296                    ),
1297                    protocol_system: protocol_system.to_string(),
1298                    chain,
1299                    include_balances,
1300                    version: version.clone(),
1301                    pagination: PaginationParams { page: 0, page_size: chunk_size as i64 },
1302                })
1303                .collect()
1304        }
1305    }
1306
1307    // TODO: remove once deprecated ProtocolId struct is removed
1308    #[allow(deprecated)]
1309    #[rstest]
1310    #[case::protocol_id_input(vec![
1311        ProtocolId { id: "id1".to_string(), chain: Chain::Ethereum },
1312        ProtocolId { id: "id2".to_string(), chain: Chain::Ethereum }
1313    ])]
1314    #[case::string_input(vec![
1315        "id1".to_string(),
1316        "id2".to_string()
1317    ])]
1318    #[tokio::test]
1319    async fn test_get_protocol_states_paginated_backwards_compatibility<T>(#[case] ids: Vec<T>)
1320    where
1321        T: AsRef<str> + Clone + Send + Sync + 'static,
1322    {
1323        let mock_client = MockRPCClient::new();
1324
1325        let request_bodies = mock_client
1326            .test_get_protocol_states_paginated(
1327                Chain::Ethereum,
1328                &ids,
1329                "test_system",
1330                true,
1331                &VersionParam::default(),
1332                2,
1333                2,
1334            )
1335            .await;
1336
1337        // Verify that the request bodies have been created correctly
1338        assert_eq!(request_bodies.len(), 1);
1339        assert_eq!(
1340            request_bodies[0]
1341                .protocol_ids
1342                .as_ref()
1343                .unwrap()
1344                .len(),
1345            2
1346        );
1347    }
1348
1349    #[tokio::test]
1350    async fn test_get_contract_state() {
1351        let mut server = Server::new_async().await;
1352        let server_resp = r#"
1353        {
1354            "accounts": [
1355                {
1356                    "chain": "ethereum",
1357                    "address": "0x0000000000000000000000000000000000000000",
1358                    "title": "",
1359                    "slots": {},
1360                    "native_balance": "0x01f4",
1361                    "token_balances": {},
1362                    "code": "0x00",
1363                    "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
1364                    "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
1365                    "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
1366                    "creation_tx": null
1367                }
1368            ],
1369            "pagination": {
1370                "page": 0,
1371                "page_size": 20,
1372                "total": 10
1373            }
1374        }
1375        "#;
1376        // test that the response is deserialized correctly
1377        serde_json::from_str::<StateRequestResponse>(server_resp).expect("deserialize");
1378
1379        let mocked_server = server
1380            .mock("POST", "/v1/contract_state")
1381            .expect(1)
1382            .with_body(server_resp)
1383            .create_async()
1384            .await;
1385
1386        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1387
1388        let response = client
1389            .get_contract_state(&Default::default())
1390            .await
1391            .expect("get state");
1392        let accounts = response.accounts;
1393
1394        mocked_server.assert();
1395        assert_eq!(accounts.len(), 1);
1396        assert_eq!(accounts[0].slots, HashMap::new());
1397        assert_eq!(accounts[0].native_balance, Bytes::from(500u16.to_be_bytes()));
1398        assert_eq!(accounts[0].code, [0].to_vec());
1399        assert_eq!(
1400            accounts[0].code_hash,
1401            hex::decode("5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e")
1402                .unwrap()
1403        );
1404    }
1405
1406    #[tokio::test]
1407    async fn test_get_protocol_components() {
1408        let mut server = Server::new_async().await;
1409        let server_resp = r#"
1410        {
1411            "protocol_components": [
1412                {
1413                    "id": "State1",
1414                    "protocol_system": "ambient",
1415                    "protocol_type_name": "Pool",
1416                    "chain": "ethereum",
1417                    "tokens": [
1418                        "0x0000000000000000000000000000000000000000",
1419                        "0x0000000000000000000000000000000000000001"
1420                    ],
1421                    "contract_ids": [
1422                        "0x0000000000000000000000000000000000000000"
1423                    ],
1424                    "static_attributes": {
1425                        "attribute_1": "0x00000000000003e8"
1426                    },
1427                    "change": "Creation",
1428                    "creation_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
1429                    "created_at": "2022-01-01T00:00:00"
1430                }
1431            ],
1432            "pagination": {
1433                "page": 0,
1434                "page_size": 20,
1435                "total": 10
1436            }
1437        }
1438        "#;
1439        // test that the response is deserialized correctly
1440        serde_json::from_str::<ProtocolComponentRequestResponse>(server_resp).expect("deserialize");
1441
1442        let mocked_server = server
1443            .mock("POST", "/v1/protocol_components")
1444            .expect(1)
1445            .with_body(server_resp)
1446            .create_async()
1447            .await;
1448
1449        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1450
1451        let response = client
1452            .get_protocol_components(&Default::default())
1453            .await
1454            .expect("get state");
1455        let components = response.protocol_components;
1456
1457        mocked_server.assert();
1458        assert_eq!(components.len(), 1);
1459        assert_eq!(components[0].id, "State1");
1460        assert_eq!(components[0].protocol_system, "ambient");
1461        assert_eq!(components[0].protocol_type_name, "Pool");
1462        assert_eq!(components[0].tokens.len(), 2);
1463        let expected_attributes =
1464            [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
1465                .iter()
1466                .cloned()
1467                .collect::<HashMap<String, Bytes>>();
1468        assert_eq!(components[0].static_attributes, expected_attributes);
1469    }
1470
1471    #[tokio::test]
1472    async fn test_get_protocol_states() {
1473        let mut server = Server::new_async().await;
1474        let server_resp = r#"
1475        {
1476            "states": [
1477                {
1478                    "component_id": "State1",
1479                    "attributes": {
1480                        "attribute_1": "0x00000000000003e8"
1481                    },
1482                    "balances": {
1483                        "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
1484                    }
1485                }
1486            ],
1487            "pagination": {
1488                "page": 0,
1489                "page_size": 20,
1490                "total": 10
1491            }
1492        }
1493        "#;
1494        // test that the response is deserialized correctly
1495        serde_json::from_str::<ProtocolStateRequestResponse>(server_resp).expect("deserialize");
1496
1497        let mocked_server = server
1498            .mock("POST", "/v1/protocol_state")
1499            .expect(1)
1500            .with_body(server_resp)
1501            .create_async()
1502            .await;
1503        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1504
1505        let response = client
1506            .get_protocol_states(&Default::default())
1507            .await
1508            .expect("get state");
1509        let states = response.states;
1510
1511        mocked_server.assert();
1512        assert_eq!(states.len(), 1);
1513        assert_eq!(states[0].component_id, "State1");
1514        let expected_attributes =
1515            [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
1516                .iter()
1517                .cloned()
1518                .collect::<HashMap<String, Bytes>>();
1519        assert_eq!(states[0].attributes, expected_attributes);
1520        let expected_balances = [(
1521            Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")
1522                .expect("Unsupported address format"),
1523            Bytes::from_str("0x01f4").unwrap(),
1524        )]
1525        .iter()
1526        .cloned()
1527        .collect::<HashMap<Bytes, Bytes>>();
1528        assert_eq!(states[0].balances, expected_balances);
1529    }
1530
1531    #[tokio::test]
1532    async fn test_get_tokens() {
1533        let mut server = Server::new_async().await;
1534        let server_resp = r#"
1535        {
1536            "tokens": [
1537              {
1538                "chain": "ethereum",
1539                "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
1540                "symbol": "WETH",
1541                "decimals": 18,
1542                "tax": 0,
1543                "gas": [
1544                  29962
1545                ],
1546                "quality": 100
1547              },
1548              {
1549                "chain": "ethereum",
1550                "address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
1551                "symbol": "USDC",
1552                "decimals": 6,
1553                "tax": 0,
1554                "gas": [
1555                  40652
1556                ],
1557                "quality": 100
1558              }
1559            ],
1560            "pagination": {
1561              "page": 0,
1562              "page_size": 20,
1563              "total": 10
1564            }
1565          }
1566        "#;
1567        // test that the response is deserialized correctly
1568        serde_json::from_str::<TokensRequestResponse>(server_resp).expect("deserialize");
1569
1570        let mocked_server = server
1571            .mock("POST", "/v1/tokens")
1572            .expect(1)
1573            .with_body(server_resp)
1574            .create_async()
1575            .await;
1576        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1577
1578        let response = client
1579            .get_tokens(&Default::default())
1580            .await
1581            .expect("get tokens");
1582
1583        let expected = vec![
1584            ResponseToken {
1585                chain: Chain::Ethereum,
1586                address: Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap(),
1587                symbol: "WETH".to_string(),
1588                decimals: 18,
1589                tax: 0,
1590                gas: vec![Some(29962)],
1591                quality: 100,
1592            },
1593            ResponseToken {
1594                chain: Chain::Ethereum,
1595                address: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
1596                symbol: "USDC".to_string(),
1597                decimals: 6,
1598                tax: 0,
1599                gas: vec![Some(40652)],
1600                quality: 100,
1601            },
1602        ];
1603
1604        mocked_server.assert();
1605        assert_eq!(response.tokens, expected);
1606        assert_eq!(response.pagination, PaginationResponse { page: 0, page_size: 20, total: 10 });
1607    }
1608
1609    #[tokio::test]
1610    async fn test_get_protocol_systems() {
1611        let mut server = Server::new_async().await;
1612        let server_resp = r#"
1613        {
1614            "protocol_systems": [
1615                "system1",
1616                "system2"
1617            ],
1618            "pagination": {
1619                "page": 0,
1620                "page_size": 20,
1621                "total": 10
1622            }
1623        }
1624        "#;
1625        // test that the response is deserialized correctly
1626        serde_json::from_str::<ProtocolSystemsRequestResponse>(server_resp).expect("deserialize");
1627
1628        let mocked_server = server
1629            .mock("POST", "/v1/protocol_systems")
1630            .expect(1)
1631            .with_body(server_resp)
1632            .create_async()
1633            .await;
1634        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1635
1636        let response = client
1637            .get_protocol_systems(&Default::default())
1638            .await
1639            .expect("get protocol systems");
1640        let protocol_systems = response.protocol_systems;
1641
1642        mocked_server.assert();
1643        assert_eq!(protocol_systems, vec!["system1", "system2"]);
1644    }
1645
1646    #[tokio::test]
1647    async fn test_get_component_tvl() {
1648        let mut server = Server::new_async().await;
1649        let server_resp = r#"
1650        {
1651            "tvl": {
1652                "component1": 100.0
1653            },
1654            "pagination": {
1655                "page": 0,
1656                "page_size": 20,
1657                "total": 10
1658            }
1659        }
1660        "#;
1661        // test that the response is deserialized correctly
1662        serde_json::from_str::<ComponentTvlRequestResponse>(server_resp).expect("deserialize");
1663
1664        let mocked_server = server
1665            .mock("POST", "/v1/component_tvl")
1666            .expect(1)
1667            .with_body(server_resp)
1668            .create_async()
1669            .await;
1670        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1671
1672        let response = client
1673            .get_component_tvl(&Default::default())
1674            .await
1675            .expect("get protocol systems");
1676        let component_tvl = response.tvl;
1677
1678        mocked_server.assert();
1679        assert_eq!(component_tvl.get("component1"), Some(&100.0));
1680    }
1681
1682    #[tokio::test]
1683    async fn test_get_traced_entry_points() {
1684        let mut server = Server::new_async().await;
1685        let server_resp = r#"
1686        {
1687            "traced_entry_points": {
1688                "component_1": [
1689                    [
1690                        {
1691                            "entry_point": {
1692                                "external_id": "entrypoint_a",
1693                                "target": "0x0000000000000000000000000000000000000001",
1694                                "signature": "sig()"
1695                            },
1696                            "params": {
1697                                "method": "rpctracer",
1698                                "caller": "0x000000000000000000000000000000000000000a",
1699                                "calldata": "0x000000000000000000000000000000000000000b"
1700                            }
1701                        },
1702                        {
1703                            "retriggers": [
1704                                [
1705                                    "0x00000000000000000000000000000000000000aa",
1706                                    {"key": "0x0000000000000000000000000000000000000aaa", "offset": 12}
1707                                ]
1708                            ],
1709                            "accessed_slots": {
1710                                "0x0000000000000000000000000000000000aaaa": [
1711                                    "0x0000000000000000000000000000000000aaaa"
1712                                ]
1713                            }
1714                        }
1715                    ]
1716                ]
1717            },
1718            "pagination": {
1719                "page": 0,
1720                "page_size": 20,
1721                "total": 1
1722            }
1723        }
1724        "#;
1725        // test that the response is deserialized correctly
1726        serde_json::from_str::<TracedEntryPointRequestResponse>(server_resp).expect("deserialize");
1727
1728        let mocked_server = server
1729            .mock("POST", "/v1/traced_entry_points")
1730            .expect(1)
1731            .with_body(server_resp)
1732            .create_async()
1733            .await;
1734        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
1735
1736        let response = client
1737            .get_traced_entry_points(&Default::default())
1738            .await
1739            .expect("get traced entry points");
1740        let entrypoints = response.traced_entry_points;
1741
1742        mocked_server.assert();
1743        assert_eq!(entrypoints.len(), 1);
1744        let comp1_entrypoints = entrypoints
1745            .get("component_1")
1746            .expect("component_1 entrypoints should exist");
1747        assert_eq!(comp1_entrypoints.len(), 1);
1748
1749        let (entrypoint, trace_result) = &comp1_entrypoints[0];
1750        assert_eq!(entrypoint.entry_point.external_id, "entrypoint_a");
1751        assert_eq!(
1752            entrypoint.entry_point.target,
1753            Bytes::from_str("0x0000000000000000000000000000000000000001").unwrap()
1754        );
1755        assert_eq!(entrypoint.entry_point.signature, "sig()");
1756        let TracingParams::RPCTracer(rpc_params) = &entrypoint.params;
1757        assert_eq!(
1758            rpc_params.caller,
1759            Some(Bytes::from("0x000000000000000000000000000000000000000a"))
1760        );
1761        assert_eq!(rpc_params.calldata, Bytes::from("0x000000000000000000000000000000000000000b"));
1762
1763        assert_eq!(
1764            trace_result.retriggers,
1765            HashSet::from([(
1766                Bytes::from("0x00000000000000000000000000000000000000aa"),
1767                AddressStorageLocation::new(
1768                    Bytes::from("0x0000000000000000000000000000000000000aaa"),
1769                    12
1770                )
1771            )])
1772        );
1773        assert_eq!(trace_result.accessed_slots.len(), 1);
1774        assert_eq!(
1775            trace_result.accessed_slots,
1776            HashMap::from([(
1777                Bytes::from("0x0000000000000000000000000000000000aaaa"),
1778                HashSet::from([Bytes::from("0x0000000000000000000000000000000000aaaa")])
1779            )])
1780        );
1781    }
1782
1783    #[tokio::test]
1784    async fn test_parse_retry_value_numeric() {
1785        let result = parse_retry_value("60");
1786        assert!(result.is_some());
1787
1788        let expected_time = SystemTime::now() + Duration::from_secs(60);
1789        let actual_time = result.unwrap();
1790
1791        // Allow for small timing differences during test execution
1792        let diff = if actual_time > expected_time {
1793            actual_time
1794                .duration_since(expected_time)
1795                .unwrap()
1796        } else {
1797            expected_time
1798                .duration_since(actual_time)
1799                .unwrap()
1800        };
1801        assert!(diff < Duration::from_secs(1), "Time difference too large: {:?}", diff);
1802    }
1803
1804    #[tokio::test]
1805    async fn test_parse_retry_value_rfc2822() {
1806        // Use a fixed future date in RFC2822 format
1807        let rfc2822_date = "Sat, 01 Jan 2030 12:00:00 +0000";
1808        let result = parse_retry_value(rfc2822_date);
1809        assert!(result.is_some());
1810
1811        let parsed_time = result.unwrap();
1812        assert!(parsed_time > SystemTime::now());
1813    }
1814
1815    #[tokio::test]
1816    async fn test_parse_retry_value_invalid_formats() {
1817        // Test various invalid formats
1818        assert!(parse_retry_value("invalid").is_none());
1819        assert!(parse_retry_value("").is_none());
1820        assert!(parse_retry_value("not_a_number").is_none());
1821        assert!(parse_retry_value("Mon, 32 Jan 2030 25:00:00 +0000").is_none()); // Invalid date
1822    }
1823
1824    #[tokio::test]
1825    async fn test_parse_retry_value_zero_seconds() {
1826        let result = parse_retry_value("0");
1827        assert!(result.is_some());
1828
1829        let expected_time = SystemTime::now();
1830        let actual_time = result.unwrap();
1831
1832        // Should be very close to current time
1833        let diff = if actual_time > expected_time {
1834            actual_time
1835                .duration_since(expected_time)
1836                .unwrap()
1837        } else {
1838            expected_time
1839                .duration_since(actual_time)
1840                .unwrap()
1841        };
1842        assert!(diff < Duration::from_secs(1));
1843    }
1844
1845    #[tokio::test]
1846    async fn test_error_for_response_rate_limited() {
1847        let mut server = Server::new_async().await;
1848        let mock = server
1849            .mock("GET", "/test")
1850            .with_status(429)
1851            .with_header("Retry-After", "60")
1852            .create_async()
1853            .await;
1854
1855        let client = reqwest::Client::new();
1856        let response = client
1857            .get(format!("{}/test", server.url()))
1858            .send()
1859            .await
1860            .unwrap();
1861
1862        let http_client = HttpRPCClient::new(server.url().as_str(), None)
1863            .unwrap()
1864            .with_test_backoff_policy();
1865        let result = http_client
1866            .error_for_response(response)
1867            .await;
1868
1869        mock.assert();
1870        assert!(matches!(result, Err(RPCError::RateLimited(_))));
1871        if let Err(RPCError::RateLimited(retry_after)) = result {
1872            assert!(retry_after.is_some());
1873        }
1874    }
1875
1876    #[tokio::test]
1877    async fn test_error_for_response_rate_limited_no_header() {
1878        let mut server = Server::new_async().await;
1879        let mock = server
1880            .mock("GET", "/test")
1881            .with_status(429)
1882            .create_async()
1883            .await;
1884
1885        let client = reqwest::Client::new();
1886        let response = client
1887            .get(format!("{}/test", server.url()))
1888            .send()
1889            .await
1890            .unwrap();
1891
1892        let http_client = HttpRPCClient::new(server.url().as_str(), None)
1893            .unwrap()
1894            .with_test_backoff_policy();
1895        let result = http_client
1896            .error_for_response(response)
1897            .await;
1898
1899        mock.assert();
1900        assert!(matches!(result, Err(RPCError::RateLimited(None))));
1901    }
1902
1903    #[tokio::test]
1904    async fn test_error_for_response_server_errors() {
1905        let test_cases =
1906            vec![(502, "Bad Gateway"), (503, "Service Unavailable"), (504, "Gateway Timeout")];
1907
1908        for (status_code, expected_body) in test_cases {
1909            let mut server = Server::new_async().await;
1910            let mock = server
1911                .mock("GET", "/test")
1912                .with_status(status_code)
1913                .with_body(expected_body)
1914                .create_async()
1915                .await;
1916
1917            let client = reqwest::Client::new();
1918            let response = client
1919                .get(format!("{}/test", server.url()))
1920                .send()
1921                .await
1922                .unwrap();
1923
1924            let http_client = HttpRPCClient::new(server.url().as_str(), None)
1925                .unwrap()
1926                .with_test_backoff_policy();
1927            let result = http_client
1928                .error_for_response(response)
1929                .await;
1930
1931            mock.assert();
1932            assert!(matches!(result, Err(RPCError::ServerUnreachable(_))));
1933            if let Err(RPCError::ServerUnreachable(body)) = result {
1934                assert_eq!(body, expected_body);
1935            }
1936        }
1937    }
1938
1939    #[tokio::test]
1940    async fn test_error_for_response_success() {
1941        let mut server = Server::new_async().await;
1942        let mock = server
1943            .mock("GET", "/test")
1944            .with_status(200)
1945            .with_body("success")
1946            .create_async()
1947            .await;
1948
1949        let client = reqwest::Client::new();
1950        let response = client
1951            .get(format!("{}/test", server.url()))
1952            .send()
1953            .await
1954            .unwrap();
1955
1956        let http_client = HttpRPCClient::new(server.url().as_str(), None)
1957            .unwrap()
1958            .with_test_backoff_policy();
1959        let result = http_client
1960            .error_for_response(response)
1961            .await;
1962
1963        mock.assert();
1964        assert!(result.is_ok());
1965
1966        let response = result.unwrap();
1967        assert_eq!(response.status(), 200);
1968    }
1969
1970    #[tokio::test]
1971    async fn test_handle_error_for_backoff_server_unreachable() {
1972        let http_client = HttpRPCClient::new("http://localhost:8080", None)
1973            .unwrap()
1974            .with_test_backoff_policy();
1975        let error = RPCError::ServerUnreachable("Service down".to_string());
1976
1977        let backoff_error = http_client
1978            .handle_error_for_backoff(error)
1979            .await;
1980
1981        match backoff_error {
1982            backoff::Error::Transient { err: RPCError::ServerUnreachable(msg), retry_after } => {
1983                assert_eq!(msg, "Service down");
1984                assert_eq!(retry_after, Some(Duration::from_millis(50))); // Fast test duration
1985            }
1986            _ => panic!("Expected transient error for ServerUnreachable"),
1987        }
1988    }
1989
1990    #[tokio::test]
1991    async fn test_handle_error_for_backoff_rate_limited_with_retry_after() {
1992        let http_client = HttpRPCClient::new("http://localhost:8080", None)
1993            .unwrap()
1994            .with_test_backoff_policy();
1995        let future_time = SystemTime::now() + Duration::from_secs(30);
1996        let error = RPCError::RateLimited(Some(future_time));
1997
1998        let backoff_error = http_client
1999            .handle_error_for_backoff(error)
2000            .await;
2001
2002        match backoff_error {
2003            backoff::Error::Transient { err: RPCError::RateLimited(retry_after), .. } => {
2004                assert_eq!(retry_after, Some(future_time));
2005            }
2006            _ => panic!("Expected transient error for RateLimited"),
2007        }
2008
2009        // Verify that retry_after was stored in the client state
2010        let stored_retry_after = http_client.retry_after.read().await;
2011        assert_eq!(*stored_retry_after, Some(future_time));
2012    }
2013
2014    #[tokio::test]
2015    async fn test_handle_error_for_backoff_rate_limited_no_retry_after() {
2016        let http_client = HttpRPCClient::new("http://localhost:8080", None)
2017            .unwrap()
2018            .with_test_backoff_policy();
2019        let error = RPCError::RateLimited(None);
2020
2021        let backoff_error = http_client
2022            .handle_error_for_backoff(error)
2023            .await;
2024
2025        match backoff_error {
2026            backoff::Error::Transient { err: RPCError::RateLimited(None), .. } => {
2027                // This is expected - no retry-after still allows retries with default policy
2028            }
2029            _ => panic!("Expected transient error for RateLimited without retry-after"),
2030        }
2031    }
2032
2033    #[tokio::test]
2034    async fn test_handle_error_for_backoff_other_errors() {
2035        let http_client = HttpRPCClient::new("http://localhost:8080", None)
2036            .unwrap()
2037            .with_test_backoff_policy();
2038        let error = RPCError::ParseResponse("Invalid JSON".to_string());
2039
2040        let backoff_error = http_client
2041            .handle_error_for_backoff(error)
2042            .await;
2043
2044        match backoff_error {
2045            backoff::Error::Permanent(RPCError::ParseResponse(msg)) => {
2046                assert_eq!(msg, "Invalid JSON");
2047            }
2048            _ => panic!("Expected permanent error for ParseResponse"),
2049        }
2050    }
2051
2052    #[tokio::test]
2053    async fn test_wait_until_retry_after_no_retry_time() {
2054        let http_client = HttpRPCClient::new("http://localhost:8080", None)
2055            .unwrap()
2056            .with_test_backoff_policy();
2057
2058        let start = std::time::Instant::now();
2059        http_client
2060            .wait_until_retry_after()
2061            .await;
2062        let elapsed = start.elapsed();
2063
2064        // Should return immediately if no retry time is set
2065        assert!(elapsed < Duration::from_millis(100));
2066    }
2067
2068    #[tokio::test]
2069    async fn test_wait_until_retry_after_past_time() {
2070        let http_client = HttpRPCClient::new("http://localhost:8080", None)
2071            .unwrap()
2072            .with_test_backoff_policy();
2073
2074        // Set a retry time in the past
2075        let past_time = SystemTime::now() - Duration::from_secs(10);
2076        *http_client.retry_after.write().await = Some(past_time);
2077
2078        let start = std::time::Instant::now();
2079        http_client
2080            .wait_until_retry_after()
2081            .await;
2082        let elapsed = start.elapsed();
2083
2084        // Should return immediately if retry time is in the past
2085        assert!(elapsed < Duration::from_millis(100));
2086    }
2087
2088    #[tokio::test]
2089    async fn test_wait_until_retry_after_future_time() {
2090        let http_client = HttpRPCClient::new("http://localhost:8080", None)
2091            .unwrap()
2092            .with_test_backoff_policy();
2093
2094        // Set a retry time 100ms in the future
2095        let future_time = SystemTime::now() + Duration::from_millis(100);
2096        *http_client.retry_after.write().await = Some(future_time);
2097
2098        let start = std::time::Instant::now();
2099        http_client
2100            .wait_until_retry_after()
2101            .await;
2102        let elapsed = start.elapsed();
2103
2104        // Should wait approximately the specified duration
2105        assert!(elapsed >= Duration::from_millis(80)); // Allow some tolerance
2106        assert!(elapsed <= Duration::from_millis(200)); // Upper bound for test stability
2107    }
2108
2109    #[tokio::test]
2110    async fn test_make_post_request_success() {
2111        let mut server = Server::new_async().await;
2112        let server_resp = r#"{"success": true}"#;
2113
2114        let mock = server
2115            .mock("POST", "/test")
2116            .with_status(200)
2117            .with_body(server_resp)
2118            .create_async()
2119            .await;
2120
2121        let http_client = HttpRPCClient::new(server.url().as_str(), None)
2122            .unwrap()
2123            .with_test_backoff_policy();
2124        let request_body = serde_json::json!({"test": "data"});
2125        let uri = format!("{}/test", server.url());
2126
2127        let result = http_client
2128            .make_post_request(&request_body, &uri)
2129            .await;
2130
2131        mock.assert();
2132        assert!(result.is_ok());
2133
2134        let response = result.unwrap();
2135        assert_eq!(response.status(), 200);
2136        assert_eq!(response.text().await.unwrap(), server_resp);
2137    }
2138
2139    #[tokio::test]
2140    async fn test_make_post_request_retry_on_server_error() {
2141        let mut server = Server::new_async().await;
2142        // First request fails with 503, second succeeds
2143        let error_mock = server
2144            .mock("POST", "/test")
2145            .with_status(503)
2146            .with_body("Service Unavailable")
2147            .expect(1)
2148            .create_async()
2149            .await;
2150
2151        let success_mock = server
2152            .mock("POST", "/test")
2153            .with_status(200)
2154            .with_body(r#"{"success": true}"#)
2155            .expect(1)
2156            .create_async()
2157            .await;
2158
2159        let http_client = HttpRPCClient::new(server.url().as_str(), None)
2160            .unwrap()
2161            .with_test_backoff_policy();
2162        let request_body = serde_json::json!({"test": "data"});
2163        let uri = format!("{}/test", server.url());
2164
2165        let result = http_client
2166            .make_post_request(&request_body, &uri)
2167            .await;
2168
2169        error_mock.assert();
2170        success_mock.assert();
2171        assert!(result.is_ok());
2172    }
2173
2174    #[tokio::test]
2175    async fn test_make_post_request_respect_retry_after_header() {
2176        let mut server = Server::new_async().await;
2177
2178        // First request returns 429 with retry-after, second succeeds
2179        let rate_limit_mock = server
2180            .mock("POST", "/test")
2181            .with_status(429)
2182            .with_header("Retry-After", "1") // 1 second
2183            .expect(1)
2184            .create_async()
2185            .await;
2186
2187        let success_mock = server
2188            .mock("POST", "/test")
2189            .with_status(200)
2190            .with_body(r#"{"success": true}"#)
2191            .expect(1)
2192            .create_async()
2193            .await;
2194
2195        let http_client = HttpRPCClient::new(server.url().as_str(), None)
2196            .unwrap()
2197            .with_test_backoff_policy();
2198        let request_body = serde_json::json!({"test": "data"});
2199        let uri = format!("{}/test", server.url());
2200
2201        let start = std::time::Instant::now();
2202        let result = http_client
2203            .make_post_request(&request_body, &uri)
2204            .await;
2205        let elapsed = start.elapsed();
2206
2207        rate_limit_mock.assert();
2208        success_mock.assert();
2209        assert!(result.is_ok());
2210
2211        // Should have waited at least 1 second due to retry-after header
2212        assert!(elapsed >= Duration::from_millis(900)); // Allow some tolerance
2213        assert!(elapsed <= Duration::from_millis(2000)); // Upper bound for test stability
2214    }
2215
2216    #[tokio::test]
2217    async fn test_make_post_request_permanent_error() {
2218        let mut server = Server::new_async().await;
2219
2220        let mock = server
2221            .mock("POST", "/test")
2222            .with_status(400) // Bad Request - should not be retried
2223            .with_body("Bad Request")
2224            .expect(1)
2225            .create_async()
2226            .await;
2227
2228        let http_client = HttpRPCClient::new(server.url().as_str(), None)
2229            .unwrap()
2230            .with_test_backoff_policy();
2231        let request_body = serde_json::json!({"test": "data"});
2232        let uri = format!("{}/test", server.url());
2233
2234        let result = http_client
2235            .make_post_request(&request_body, &uri)
2236            .await;
2237
2238        mock.assert();
2239        assert!(result.is_ok()); // 400 doesn't trigger retry logic, just returns the response
2240
2241        let response = result.unwrap();
2242        assert_eq!(response.status(), 400);
2243    }
2244
2245    #[tokio::test]
2246    async fn test_concurrent_requests_with_different_retry_after() {
2247        let mut server = Server::new_async().await;
2248
2249        // First request gets rate limited with 1 second retry-after
2250        let rate_limit_mock_1 = server
2251            .mock("POST", "/test1")
2252            .with_status(429)
2253            .with_header("Retry-After", "1")
2254            .expect(1)
2255            .create_async()
2256            .await;
2257
2258        // Second request gets rate limited with 2 second retry-after
2259        let rate_limit_mock_2 = server
2260            .mock("POST", "/test2")
2261            .with_status(429)
2262            .with_header("Retry-After", "2")
2263            .expect(1)
2264            .create_async()
2265            .await;
2266
2267        // Success mocks for retries
2268        let success_mock_1 = server
2269            .mock("POST", "/test1")
2270            .with_status(200)
2271            .with_body(r#"{"result": "success1"}"#)
2272            .expect(1)
2273            .create_async()
2274            .await;
2275
2276        let success_mock_2 = server
2277            .mock("POST", "/test2")
2278            .with_status(200)
2279            .with_body(r#"{"result": "success2"}"#)
2280            .expect(1)
2281            .create_async()
2282            .await;
2283
2284        let http_client = HttpRPCClient::new(server.url().as_str(), None)
2285            .unwrap()
2286            .with_test_backoff_policy();
2287        let request_body = serde_json::json!({"test": "data"});
2288
2289        let uri1 = format!("{}/test1", server.url());
2290        let uri2 = format!("{}/test2", server.url());
2291
2292        // Start both requests concurrently
2293        let start = std::time::Instant::now();
2294        let (result1, result2) = tokio::join!(
2295            http_client.make_post_request(&request_body, &uri1),
2296            http_client.make_post_request(&request_body, &uri2)
2297        );
2298        let elapsed = start.elapsed();
2299
2300        rate_limit_mock_1.assert();
2301        rate_limit_mock_2.assert();
2302        success_mock_1.assert();
2303        success_mock_2.assert();
2304
2305        assert!(result1.is_ok());
2306        assert!(result2.is_ok());
2307
2308        // Both requests should succeed, but the second should take longer due to the 2s retry-after
2309        // The total time should be at least 2 seconds since the shared retry_after state
2310        // gets updated by both requests
2311        assert!(elapsed >= Duration::from_millis(1800)); // Allow some tolerance
2312        assert!(elapsed <= Duration::from_millis(3000)); // Upper bound
2313
2314        // Check the final retry_after state - should be the latest (higher) value
2315        let final_retry_after = http_client.retry_after.read().await;
2316        assert!(final_retry_after.is_some());
2317
2318        // The retry_after should be set to the latest (higher) value from the two requests
2319        if let Some(retry_time) = *final_retry_after {
2320            // The retry_after time might be in the past now since we waited,
2321            // but it should be reasonable (not too far in past/future)
2322            let now = SystemTime::now();
2323            let diff = if retry_time > now {
2324                retry_time.duration_since(now).unwrap()
2325            } else {
2326                now.duration_since(retry_time).unwrap()
2327            };
2328
2329            // Should be within a reasonable range (the 2s retry-after plus some buffer)
2330            assert!(diff <= Duration::from_secs(3), "Retry time difference too large: {:?}", diff);
2331        }
2332    }
2333
2334    #[tokio::test]
2335    async fn test_get_snapshots() {
2336        let mut server = Server::new_async().await;
2337
2338        // Mock protocol states response
2339        let protocol_states_resp = r#"
2340        {
2341            "states": [
2342                {
2343                    "component_id": "component1",
2344                    "attributes": {
2345                        "attribute_1": "0x00000000000003e8"
2346                    },
2347                    "balances": {
2348                        "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
2349                    }
2350                }
2351            ],
2352            "pagination": {
2353                "page": 0,
2354                "page_size": 100,
2355                "total": 1
2356            }
2357        }
2358        "#;
2359
2360        // Mock contract state response
2361        let contract_state_resp = r#"
2362        {
2363            "accounts": [
2364                {
2365                    "chain": "ethereum",
2366                    "address": "0x1111111111111111111111111111111111111111",
2367                    "title": "",
2368                    "slots": {},
2369                    "native_balance": "0x01f4",
2370                    "token_balances": {},
2371                    "code": "0x00",
2372                    "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
2373                    "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2374                    "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2375                    "creation_tx": null
2376                }
2377            ],
2378            "pagination": {
2379                "page": 0,
2380                "page_size": 100,
2381                "total": 1
2382            }
2383        }
2384        "#;
2385
2386        // Mock component TVL response
2387        let tvl_resp = r#"
2388        {
2389            "tvl": {
2390                "component1": 1000000.0
2391            },
2392            "pagination": {
2393                "page": 0,
2394                "page_size": 100,
2395                "total": 1
2396            }
2397        }
2398        "#;
2399
2400        let protocol_states_mock = server
2401            .mock("POST", "/v1/protocol_state")
2402            .expect(1)
2403            .with_body(protocol_states_resp)
2404            .create_async()
2405            .await;
2406
2407        let contract_state_mock = server
2408            .mock("POST", "/v1/contract_state")
2409            .expect(1)
2410            .with_body(contract_state_resp)
2411            .create_async()
2412            .await;
2413
2414        let tvl_mock = server
2415            .mock("POST", "/v1/component_tvl")
2416            .expect(1)
2417            .with_body(tvl_resp)
2418            .create_async()
2419            .await;
2420
2421        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
2422
2423        #[allow(deprecated)]
2424        let component = tycho_common::dto::ProtocolComponent {
2425            id: "component1".to_string(),
2426            protocol_system: "test_protocol".to_string(),
2427            protocol_type_name: "test_type".to_string(),
2428            chain: Chain::Ethereum,
2429            tokens: vec![],
2430            contract_ids: vec![
2431                Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()
2432            ],
2433            static_attributes: HashMap::new(),
2434            change: tycho_common::dto::ChangeType::Creation,
2435            creation_tx: Bytes::from_str(
2436                "0x0000000000000000000000000000000000000000000000000000000000000000",
2437            )
2438            .unwrap(),
2439            created_at: chrono::Utc::now().naive_utc(),
2440        };
2441
2442        let mut components = HashMap::new();
2443        components.insert("component1".to_string(), component);
2444
2445        let contract_ids =
2446            vec![Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()];
2447
2448        let request = SnapshotParameters::new(
2449            Chain::Ethereum,
2450            "test_protocol",
2451            &components,
2452            &contract_ids,
2453            12345,
2454        );
2455
2456        let response = client
2457            .get_snapshots(&request, 100, 4)
2458            .await
2459            .expect("get snapshots");
2460
2461        // Verify all mocks were called
2462        protocol_states_mock.assert();
2463        contract_state_mock.assert();
2464        tvl_mock.assert();
2465
2466        // Assert states
2467        assert_eq!(response.states.len(), 1);
2468        assert!(response
2469            .states
2470            .contains_key("component1"));
2471
2472        // Check that the state has the expected TVL
2473        let component_state = response
2474            .states
2475            .get("component1")
2476            .unwrap();
2477        assert_eq!(component_state.component_tvl, Some(1000000.0));
2478
2479        // Assert VM storage
2480        assert_eq!(response.vm_storage.len(), 1);
2481        let contract_addr = Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap();
2482        assert!(response
2483            .vm_storage
2484            .contains_key(&contract_addr));
2485    }
2486
2487    #[tokio::test]
2488    async fn test_get_snapshots_empty_components() {
2489        let server = Server::new_async().await;
2490        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
2491
2492        let components = HashMap::new();
2493        let contract_ids = vec![];
2494
2495        let request = SnapshotParameters::new(
2496            Chain::Ethereum,
2497            "test_protocol",
2498            &components,
2499            &contract_ids,
2500            12345,
2501        );
2502
2503        let response = client
2504            .get_snapshots(&request, 100, 4)
2505            .await
2506            .expect("get snapshots");
2507
2508        // Should return empty response without making any requests
2509        assert!(response.states.is_empty());
2510        assert!(response.vm_storage.is_empty());
2511    }
2512
2513    #[tokio::test]
2514    async fn test_get_snapshots_without_tvl() {
2515        let mut server = Server::new_async().await;
2516
2517        let protocol_states_resp = r#"
2518        {
2519            "states": [
2520                {
2521                    "component_id": "component1",
2522                    "attributes": {},
2523                    "balances": {}
2524                }
2525            ],
2526            "pagination": {
2527                "page": 0,
2528                "page_size": 100,
2529                "total": 1
2530            }
2531        }
2532        "#;
2533
2534        let protocol_states_mock = server
2535            .mock("POST", "/v1/protocol_state")
2536            .expect(1)
2537            .with_body(protocol_states_resp)
2538            .create_async()
2539            .await;
2540
2541        let client = HttpRPCClient::new(server.url().as_str(), None).expect("create client");
2542
2543        // Create test component
2544        #[allow(deprecated)]
2545        let component = tycho_common::dto::ProtocolComponent {
2546            id: "component1".to_string(),
2547            protocol_system: "test_protocol".to_string(),
2548            protocol_type_name: "test_type".to_string(),
2549            chain: Chain::Ethereum,
2550            tokens: vec![],
2551            contract_ids: vec![],
2552            static_attributes: HashMap::new(),
2553            change: tycho_common::dto::ChangeType::Creation,
2554            creation_tx: Bytes::from_str(
2555                "0x0000000000000000000000000000000000000000000000000000000000000000",
2556            )
2557            .unwrap(),
2558            created_at: chrono::Utc::now().naive_utc(),
2559        };
2560
2561        let mut components = HashMap::new();
2562        components.insert("component1".to_string(), component);
2563        let contract_ids = vec![];
2564
2565        let request = SnapshotParameters::new(
2566            Chain::Ethereum,
2567            "test_protocol",
2568            &components,
2569            &contract_ids,
2570            12345,
2571        )
2572        .include_balances(false)
2573        .include_tvl(false);
2574
2575        let response = client
2576            .get_snapshots(&request, 100, 4)
2577            .await
2578            .expect("get snapshots");
2579
2580        // Verify only necessary mocks were called
2581        protocol_states_mock.assert();
2582        // No contract_state_mock.assert() since contract_ids is empty
2583        // No tvl_mock.assert() since include_tvl is false
2584
2585        assert_eq!(response.states.len(), 1);
2586        // Check that TVL is None since we didn't request it
2587        let component_state = response
2588            .states
2589            .get("component1")
2590            .unwrap();
2591        assert_eq!(component_state.component_tvl, None);
2592    }
2593}