Skip to main content

tycho_client/
rpc.rs

1//! # Tycho RPC Client
2//!
3//! The objective of this module is to provide swift and simplified access to the Remote Procedure
4//! Call (RPC) endpoints of Tycho. These endpoints are chiefly responsible for facilitating data
5//! queries, especially querying snapshots of data.
6use std::{
7    collections::HashMap,
8    sync::Arc,
9    time::{Duration, SystemTime},
10};
11
12use async_trait::async_trait;
13use backoff::{exponential::ExponentialBackoffBuilder, ExponentialBackoff};
14use futures03::future::try_join_all;
15#[cfg(test)]
16use mockall::automock;
17use reqwest::{header, Client, ClientBuilder, Response, StatusCode, Url};
18use serde::Serialize;
19use thiserror::Error;
20use time::{format_description::well_known::Rfc2822, OffsetDateTime};
21use tokio::{
22    sync::{RwLock, Semaphore},
23    time::sleep,
24};
25use tracing::{debug, error, instrument, trace, warn};
26use tycho_common::{
27    dto::{
28        ComponentTvlRequestBody, ComponentTvlRequestResponse, PaginationLimits, PaginationParams,
29        ProtocolComponentRequestResponse, ProtocolComponentsRequestBody, ProtocolStateRequestBody,
30        ProtocolStateRequestResponse, ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse,
31        StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
32        TracedEntryPointRequestBody, TracedEntryPointRequestResponse, VersionParam,
33    },
34    models::{
35        blockchain::{EntryPointWithTracingParams, TracedEntryPoints, TracingResult},
36        contract::Account,
37        protocol::{ProtocolComponent, ProtocolComponentState},
38        token::Token,
39        Chain, ComponentId,
40    },
41    Bytes,
42};
43
44/// Data payload returned by `RPCClient::get_protocol_systems`.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct ProtocolSystems {
47    protocol_systems: Vec<String>,
48    dci_protocols: Vec<String>,
49}
50
51impl ProtocolSystems {
52    pub(crate) fn new(protocol_systems: Vec<String>, dci_protocols: Vec<String>) -> Self {
53        Self { protocol_systems, dci_protocols }
54    }
55
56    pub fn protocol_systems(&self) -> &[String] {
57        &self.protocol_systems
58    }
59
60    pub fn dci_protocols(&self) -> &[String] {
61        &self.dci_protocols
62    }
63}
64
65/// An RPC response page, bundling data with the pagination metadata the server returned.
66#[derive(Debug, Clone, PartialEq)]
67pub struct Page<T> {
68    data: T,
69    total: i64,
70    page: i64,
71    page_size: i64,
72}
73
74impl<T> Page<T> {
75    pub fn new(data: T, total: i64, page: i64, page_size: i64) -> Self {
76        Page { data, total, page, page_size }
77    }
78
79    pub fn data(&self) -> &T {
80        &self.data
81    }
82
83    pub fn into_data(self) -> T {
84        self.data
85    }
86
87    pub fn total(&self) -> i64 {
88        self.total
89    }
90
91    pub fn page(&self) -> i64 {
92        self.page
93    }
94
95    pub fn page_size(&self) -> i64 {
96        self.page_size
97    }
98}
99
100impl<T> Page<Vec<T>> {
101    pub fn len(&self) -> usize {
102        self.data.len()
103    }
104
105    pub fn is_empty(&self) -> bool {
106        self.data.is_empty()
107    }
108}
109
110impl<K, V, S: std::hash::BuildHasher> Page<HashMap<K, V, S>> {
111    pub fn len(&self) -> usize {
112        self.data.len()
113    }
114
115    pub fn is_empty(&self) -> bool {
116        self.data.is_empty()
117    }
118}
119
120impl<T: IntoIterator> IntoIterator for Page<T> {
121    type Item = T::Item;
122    type IntoIter = T::IntoIter;
123
124    fn into_iter(self) -> Self::IntoIter {
125        self.data.into_iter()
126    }
127}
128
129impl<'a, T> IntoIterator for &'a Page<T>
130where
131    &'a T: IntoIterator,
132{
133    type Item = <&'a T as IntoIterator>::Item;
134    type IntoIter = <&'a T as IntoIterator>::IntoIter;
135
136    fn into_iter(self) -> Self::IntoIter {
137        (&self.data).into_iter()
138    }
139}
140
141use crate::{
142    feed::synchronizer::{ComponentWithState, Snapshot},
143    TYCHO_SERVER_VERSION,
144};
145
146/// Suggested concurrency level for RPC clients.
147pub const RPC_CLIENT_CONCURRENCY: usize = 4;
148
149/// Parameters for [`RPCClient::get_contract_state`].
150#[derive(Clone, PartialEq, Debug)]
151pub struct ContractStateParams {
152    chain: Chain,
153    protocol_system: String,
154    contract_ids: Option<Vec<Bytes>>,
155    version: VersionParam,
156    page: i64,
157    page_size: i64,
158}
159
160impl ContractStateParams {
161    pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
162        Self {
163            chain,
164            protocol_system: protocol_system.into(),
165            contract_ids: None,
166            version: VersionParam::default(),
167            page: 0,
168            page_size: StateRequestBody::MAX_PAGE_SIZE_COMPRESSED,
169        }
170    }
171
172    pub fn with_contract_ids(mut self, ids: Vec<Bytes>) -> Self {
173        self.contract_ids = Some(ids);
174        self
175    }
176
177    pub fn with_version(mut self, version: VersionParam) -> Self {
178        self.version = version;
179        self
180    }
181
182    pub fn with_block_number(mut self, block_number: u64) -> Self {
183        self.version = VersionParam::at_block(self.chain.into(), block_number);
184        self
185    }
186
187    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
188        self.page = page;
189        self.page_size = page_size;
190        self
191    }
192}
193
194/// Parameters for [`RPCClient::get_protocol_components`].
195#[derive(Clone, PartialEq, Debug)]
196pub struct ProtocolComponentsParams {
197    chain: Chain,
198    protocol_system: String,
199    component_ids: Option<Vec<ComponentId>>,
200    tvl_gt: Option<f64>,
201    page: i64,
202    page_size: i64,
203}
204
205impl ProtocolComponentsParams {
206    pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
207        Self {
208            chain,
209            protocol_system: protocol_system.into(),
210            component_ids: None,
211            tvl_gt: None,
212            page: 0,
213            page_size: ProtocolComponentsRequestBody::MAX_PAGE_SIZE_COMPRESSED,
214        }
215    }
216
217    pub fn with_component_ids(mut self, ids: Vec<ComponentId>) -> Self {
218        self.component_ids = Some(ids);
219        self
220    }
221
222    pub fn with_tvl_gt(mut self, tvl_gt: f64) -> Self {
223        self.tvl_gt = Some(tvl_gt);
224        self
225    }
226
227    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
228        self.page = page;
229        self.page_size = page_size;
230        self
231    }
232
233    #[cfg(test)]
234    pub(crate) fn component_ids(&self) -> Option<&Vec<ComponentId>> {
235        self.component_ids.as_ref()
236    }
237}
238
239/// Parameters for [`RPCClient::get_protocol_states`].
240#[derive(Clone, PartialEq, Debug)]
241pub struct ProtocolStatesParams {
242    chain: Chain,
243    protocol_system: String,
244    protocol_ids: Option<Vec<String>>,
245    include_balances: bool,
246    version: VersionParam,
247    page: i64,
248    page_size: i64,
249}
250
251impl ProtocolStatesParams {
252    pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
253        Self {
254            chain,
255            protocol_system: protocol_system.into(),
256            protocol_ids: None,
257            include_balances: false,
258            version: VersionParam::default(),
259            page: 0,
260            page_size: ProtocolStateRequestBody::MAX_PAGE_SIZE_COMPRESSED,
261        }
262    }
263
264    pub fn with_protocol_ids(mut self, ids: Vec<String>) -> Self {
265        self.protocol_ids = Some(ids);
266        self
267    }
268
269    pub fn with_include_balances(mut self, include_balances: bool) -> Self {
270        self.include_balances = include_balances;
271        self
272    }
273
274    pub fn with_version(mut self, version: VersionParam) -> Self {
275        self.version = version;
276        self
277    }
278
279    pub fn with_block_number(mut self, block_number: u64) -> Self {
280        self.version = VersionParam::at_block(self.chain.into(), block_number);
281        self
282    }
283
284    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
285        self.page = page;
286        self.page_size = page_size;
287        self
288    }
289}
290
291/// Parameters for [`RPCClient::get_tokens`].
292#[derive(Clone, PartialEq, Debug)]
293pub struct TokensParams {
294    chain: Chain,
295    min_quality: Option<i32>,
296    traded_n_days_ago: Option<u64>,
297    page: i64,
298    page_size: i64,
299}
300
301impl TokensParams {
302    pub fn new(chain: Chain) -> Self {
303        Self {
304            chain,
305            min_quality: None,
306            traded_n_days_ago: None,
307            page: 0,
308            page_size: TokensRequestBody::MAX_PAGE_SIZE_COMPRESSED,
309        }
310    }
311
312    pub fn with_min_quality(mut self, min_quality: i32) -> Self {
313        self.min_quality = Some(min_quality);
314        self
315    }
316
317    pub fn with_traded_n_days_ago(mut self, days: u64) -> Self {
318        self.traded_n_days_ago = Some(days);
319        self
320    }
321
322    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
323        self.page = page;
324        self.page_size = page_size;
325        self
326    }
327}
328
329/// Parameters for [`RPCClient::get_protocol_systems`].
330#[derive(Clone, PartialEq, Debug)]
331pub struct ProtocolSystemsParams {
332    chain: Chain,
333    page: i64,
334    page_size: i64,
335}
336
337impl ProtocolSystemsParams {
338    pub fn new(chain: Chain) -> Self {
339        Self { chain, page: 0, page_size: ProtocolSystemsRequestBody::MAX_PAGE_SIZE_COMPRESSED }
340    }
341
342    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
343        self.page = page;
344        self.page_size = page_size;
345        self
346    }
347}
348
349/// Parameters for [`RPCClient::get_component_tvl`].
350#[derive(Clone, PartialEq, Debug)]
351pub struct ComponentTvlParams {
352    chain: Chain,
353    protocol_system: Option<String>,
354    component_ids: Option<Vec<String>>,
355    page: i64,
356    page_size: i64,
357}
358
359impl ComponentTvlParams {
360    pub fn new(chain: Chain) -> Self {
361        Self {
362            chain,
363            protocol_system: None,
364            component_ids: None,
365            page: 0,
366            page_size: ComponentTvlRequestBody::MAX_PAGE_SIZE_COMPRESSED,
367        }
368    }
369
370    pub fn with_protocol_system(mut self, protocol_system: impl Into<String>) -> Self {
371        self.protocol_system = Some(protocol_system.into());
372        self
373    }
374
375    pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
376        self.component_ids = Some(ids);
377        self
378    }
379
380    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
381        self.page = page;
382        self.page_size = page_size;
383        self
384    }
385}
386
387/// Parameters for [`RPCClient::get_traced_entry_points`].
388#[derive(Clone, PartialEq, Debug)]
389pub struct TracedEntryPointsParams {
390    chain: Chain,
391    protocol_system: String,
392    component_ids: Option<Vec<String>>,
393    page: i64,
394    page_size: i64,
395}
396
397impl TracedEntryPointsParams {
398    pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
399        Self {
400            chain,
401            protocol_system: protocol_system.into(),
402            component_ids: None,
403            page: 0,
404            page_size: TracedEntryPointRequestBody::MAX_PAGE_SIZE_COMPRESSED,
405        }
406    }
407
408    pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
409        self.component_ids = Some(ids);
410        self
411    }
412
413    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
414        self.page = page;
415        self.page_size = page_size;
416        self
417    }
418}
419
420/// Parameters for [`RPCClient::get_protocol_components_paginated`].
421#[derive(Clone, PartialEq, Debug)]
422pub struct ProtocolComponentsPaginatedParams {
423    chain: Chain,
424    protocol_system: String,
425    component_ids: Option<Vec<ComponentId>>,
426    tvl_gt: Option<f64>,
427    chunk_size: Option<usize>,
428    concurrency: usize,
429}
430
431impl ProtocolComponentsPaginatedParams {
432    pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
433        Self {
434            chain,
435            protocol_system: protocol_system.into(),
436            component_ids: None,
437            tvl_gt: None,
438            chunk_size: None,
439            concurrency,
440        }
441    }
442
443    pub fn with_component_ids(mut self, ids: Vec<ComponentId>) -> Self {
444        self.component_ids = Some(ids);
445        self
446    }
447
448    pub fn with_tvl_gt(mut self, tvl_gt: f64) -> Self {
449        self.tvl_gt = Some(tvl_gt);
450        self
451    }
452
453    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
454        self.chunk_size = Some(chunk_size);
455        self
456    }
457}
458
459/// Parameters for [`RPCClient::get_traced_entry_points_paginated`].
460#[derive(Clone, PartialEq, Debug)]
461pub struct TracedEntryPointsPaginatedParams {
462    chain: Chain,
463    protocol_system: String,
464    component_ids: Vec<String>,
465    chunk_size: Option<usize>,
466    concurrency: usize,
467}
468
469impl TracedEntryPointsPaginatedParams {
470    pub fn new(
471        chain: Chain,
472        protocol_system: impl Into<String>,
473        component_ids: Vec<String>,
474        concurrency: usize,
475    ) -> Self {
476        Self {
477            chain,
478            protocol_system: protocol_system.into(),
479            component_ids,
480            chunk_size: None,
481            concurrency,
482        }
483    }
484
485    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
486        self.chunk_size = Some(chunk_size);
487        self
488    }
489}
490
491/// Parameters for [`RPCClient::get_protocol_states_paginated`].
492#[derive(Clone, PartialEq, Debug)]
493pub struct ProtocolStatesPaginatedParams {
494    chain: Chain,
495    protocol_system: String,
496    protocol_ids: Vec<String>,
497    include_balances: bool,
498    version: VersionParam,
499    chunk_size: Option<usize>,
500    concurrency: usize,
501}
502
503impl ProtocolStatesPaginatedParams {
504    pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
505        Self {
506            chain,
507            protocol_system: protocol_system.into(),
508            protocol_ids: Vec::new(),
509            include_balances: true,
510            version: VersionParam::default(),
511            chunk_size: None,
512            concurrency,
513        }
514    }
515
516    pub fn with_protocol_ids(mut self, ids: Vec<String>) -> Self {
517        self.protocol_ids = ids;
518        self
519    }
520
521    pub fn with_include_balances(mut self, include_balances: bool) -> Self {
522        self.include_balances = include_balances;
523        self
524    }
525
526    pub fn with_version(mut self, version: VersionParam) -> Self {
527        self.version = version;
528        self
529    }
530
531    pub fn with_block_number(mut self, block_number: u64) -> Self {
532        self.version = VersionParam::at_block(self.chain.into(), block_number);
533        self
534    }
535
536    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
537        self.chunk_size = Some(chunk_size);
538        self
539    }
540}
541
542/// Parameters for [`RPCClient::get_all_tokens`].
543#[derive(Clone, PartialEq, Debug)]
544pub struct AllTokensParams {
545    chain: Chain,
546    min_quality: Option<i32>,
547    traded_n_days_ago: Option<u64>,
548    chunk_size: Option<usize>,
549    concurrency: usize,
550}
551
552impl AllTokensParams {
553    pub fn new(chain: Chain, concurrency: usize) -> Self {
554        Self { chain, min_quality: None, traded_n_days_ago: None, chunk_size: None, concurrency }
555    }
556
557    pub fn with_min_quality(mut self, min_quality: i32) -> Self {
558        self.min_quality = Some(min_quality);
559        self
560    }
561
562    pub fn with_traded_n_days_ago(mut self, days: u64) -> Self {
563        self.traded_n_days_ago = Some(days);
564        self
565    }
566
567    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
568        self.chunk_size = Some(chunk_size);
569        self
570    }
571}
572
573/// Parameters for [`RPCClient::get_component_tvl_paginated`].
574#[derive(Clone, PartialEq, Debug)]
575pub struct ComponentTvlPaginatedParams {
576    chain: Chain,
577    protocol_system: Option<String>,
578    component_ids: Option<Vec<String>>,
579    chunk_size: Option<usize>,
580    concurrency: usize,
581}
582
583impl ComponentTvlPaginatedParams {
584    pub fn new(chain: Chain, concurrency: usize) -> Self {
585        Self { chain, protocol_system: None, component_ids: None, chunk_size: None, concurrency }
586    }
587
588    pub fn with_protocol_system(mut self, protocol_system: impl Into<String>) -> Self {
589        self.protocol_system = Some(protocol_system.into());
590        self
591    }
592
593    pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
594        self.component_ids = Some(ids);
595        self
596    }
597
598    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
599        self.chunk_size = Some(chunk_size);
600        self
601    }
602}
603
604/// Parameters for [`RPCClient::get_contract_state_paginated`].
605#[derive(Clone, PartialEq, Debug)]
606pub struct ContractStatePaginatedParams {
607    chain: Chain,
608    protocol_system: String,
609    contract_ids: Vec<Bytes>,
610    version: VersionParam,
611    chunk_size: Option<usize>,
612    concurrency: usize,
613}
614
615impl ContractStatePaginatedParams {
616    pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
617        Self {
618            chain,
619            protocol_system: protocol_system.into(),
620            contract_ids: Vec::new(),
621            version: VersionParam::default(),
622            chunk_size: None,
623            concurrency,
624        }
625    }
626
627    pub fn with_contract_ids(mut self, ids: Vec<Bytes>) -> Self {
628        self.contract_ids = ids;
629        self
630    }
631
632    pub fn with_version(mut self, version: VersionParam) -> Self {
633        self.version = version;
634        self
635    }
636
637    pub fn with_block_number(mut self, block_number: u64) -> Self {
638        self.version = VersionParam::at_block(self.chain.into(), block_number);
639        self
640    }
641
642    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
643        self.chunk_size = Some(chunk_size);
644        self
645    }
646}
647
648/// Request body for fetching a snapshot of protocol states and VM storage.
649///
650/// This struct helps to coordinate fetching  multiple pieces of related data
651/// (protocol states, contract storage, TVL, entry points).
652#[derive(Clone, Debug, PartialEq)]
653pub struct SnapshotParameters<'a> {
654    /// Which chain to fetch snapshots for
655    pub chain: Chain,
656    /// Protocol system name, required for correct state resolution
657    pub protocol_system: &'a str,
658    /// Components to fetch protocol states for
659    pub components: &'a HashMap<ComponentId, ProtocolComponent>,
660    /// Traced entry points data mapped by component id (model types)
661    pub entrypoints: Option<&'a TracedEntryPoints>,
662    /// Contract addresses to fetch VM storage for
663    pub contract_ids: &'a [Bytes],
664    /// Block number for versioning
665    pub block_number: u64,
666    /// Whether to include balance information
667    pub include_balances: bool,
668    /// Whether to fetch TVL data
669    pub include_tvl: bool,
670}
671
672impl<'a> SnapshotParameters<'a> {
673    pub fn new(
674        chain: Chain,
675        protocol_system: &'a str,
676        components: &'a HashMap<ComponentId, ProtocolComponent>,
677        contract_ids: &'a [Bytes],
678        block_number: u64,
679    ) -> Self {
680        Self {
681            chain,
682            protocol_system,
683            components,
684            entrypoints: None,
685            contract_ids,
686            block_number,
687            include_balances: true,
688            include_tvl: true,
689        }
690    }
691
692    /// Set whether to include balance information (default: true)
693    pub fn include_balances(mut self, include_balances: bool) -> Self {
694        self.include_balances = include_balances;
695        self
696    }
697
698    /// Set whether to fetch TVL data (default: true)
699    pub fn include_tvl(mut self, include_tvl: bool) -> Self {
700        self.include_tvl = include_tvl;
701        self
702    }
703
704    pub fn entrypoints(mut self, entrypoints: &'a TracedEntryPoints) -> Self {
705        self.entrypoints = Some(entrypoints);
706        self
707    }
708}
709
710#[derive(Error, Debug)]
711pub enum RPCError {
712    /// The passed tycho url failed to parse.
713    #[error("Failed to parse URL: {0}. Error: {1}")]
714    UrlParsing(String, String),
715
716    /// The request data is not correctly formed.
717    #[error("Failed to format request: {0}")]
718    FormatRequest(String),
719
720    /// Errors forwarded from the HTTP protocol.
721    #[error("Unexpected HTTP client error: {0}")]
722    HttpClient(String, #[source] reqwest::Error),
723
724    /// The response from the server could not be parsed correctly.
725    #[error("Failed to parse response: {0}")]
726    ParseResponse(String),
727
728    /// The requested block is outside the server's retention window.
729    #[error("Snapshot block is stale: {0}")]
730    StaleBlock(String),
731
732    /// The requested extractor does not exist on the server.
733    #[error("Unknown extractor: {0}")]
734    UnknownExtractor(String),
735
736    /// Other fatal errors.
737    #[error("Fatal error: {0}")]
738    Fatal(String),
739
740    #[error("Rate limited until {0:?}")]
741    RateLimited(Option<SystemTime>),
742
743    #[error("Server unreachable: {0}")]
744    ServerUnreachable(String),
745}
746
747impl RPCError {
748    /// Converts an HTTP response body parse failure into the correct `RPCError`.
749    ///
750    /// The tycho server returns plain-text error messages (not JSON) when a requested block falls
751    /// outside its retention window. Detecting these here gives callers a typed signal to retry
752    /// with a more recent block rather than treating it as an unrecoverable parse failure.
753    ///
754    /// NOTE: The string matching below is coupled to the server's error message text. If those
755    /// messages change server-side this silently regresses to `ParseResponse`. Replace with a
756    /// structured error code if the server ever returns typed error responses.
757    fn from_parse_error(err: serde_json::Error, body: &str) -> Self {
758        if body.contains("version is older than") || body.contains("Could not find Block") {
759            RPCError::StaleBlock(body.to_string())
760        } else if body.starts_with("Unknown extractor:") {
761            RPCError::UnknownExtractor(body.to_string())
762        } else {
763            RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
764        }
765    }
766}
767
768#[cfg_attr(test, automock)]
769#[async_trait]
770pub trait RPCClient: Send + Sync {
771    /// Returns whether compression is enabled for requests.
772    fn compression(&self) -> bool;
773
774    /// Retrieves a snapshot of contract state for the given contract addresses.
775    ///
776    /// `block_number` pins the query to a specific block; pass `None` to use the latest state.
777    async fn get_contract_state(
778        &self,
779        params: ContractStateParams,
780    ) -> Result<Page<Vec<Account>>, RPCError>;
781
782    /// Retrieves a snapshot of contract state for a set of contract IDs.
783    ///
784    /// If `chunk_size` is `None`, it defaults to the maximum page size.
785    async fn get_contract_state_paginated(
786        &self,
787        params: ContractStatePaginatedParams,
788    ) -> Result<Vec<Account>, RPCError> {
789        let semaphore = Arc::new(Semaphore::new(params.concurrency));
790
791        // Sort the ids to maximize server-side cache hits
792        let mut sorted_ids = params.contract_ids;
793        sorted_ids.sort();
794
795        let chunk_size = params
796            .chunk_size
797            .unwrap_or(StateRequestBody::effective_max_page_size(self.compression()) as usize);
798
799        let mut tasks = Vec::new();
800        for chunk in sorted_ids.chunks(chunk_size) {
801            let sem = semaphore.clone();
802            let base_params =
803                ContractStateParams::new(params.chain, params.protocol_system.as_str())
804                    .with_contract_ids(chunk.to_vec())
805                    .with_version(params.version.clone())
806                    .with_pagination(0, chunk_size as i64);
807            tasks.push(async move {
808                let _permit = sem
809                    .acquire()
810                    .await
811                    .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
812                self.get_contract_state(base_params)
813                    .await
814            });
815        }
816
817        let pages = try_join_all(tasks).await?;
818
819        let accounts = pages
820            .into_iter()
821            .flat_map(|p| p.into_iter())
822            .collect();
823
824        Ok(accounts)
825    }
826
827    /// Retrieves protocol components matching the given filters.
828    ///
829    /// Pass `component_ids` to filter by specific IDs, or `tvl_gt` to filter by minimum TVL.
830    async fn get_protocol_components(
831        &self,
832        params: ProtocolComponentsParams,
833    ) -> Result<Page<Vec<ProtocolComponent>>, RPCError>;
834
835    /// Retrieves protocol components, fetching all pages automatically.
836    ///
837    /// If `chunk_size` is `None`, it defaults to the maximum page size.
838    async fn get_protocol_components_paginated(
839        &self,
840        params: ProtocolComponentsPaginatedParams,
841    ) -> Result<Vec<ProtocolComponent>, RPCError> {
842        let chain = params.chain;
843        let protocol_system = params.protocol_system;
844        let component_ids = params.component_ids;
845        let tvl_gt = params.tvl_gt;
846        let chunk_size = params.chunk_size;
847        let concurrency = params.concurrency;
848
849        let semaphore = Arc::new(Semaphore::new(concurrency));
850
851        let chunk_size = chunk_size.unwrap_or(
852            ProtocolComponentsRequestBody::effective_max_page_size(self.compression()) as usize,
853        );
854
855        // If a set of component IDs is specified, the maximum return size is already known,
856        // allowing us to pre-compute the number of requests to be made.
857        match component_ids {
858            Some(ids) => {
859                let tasks: Vec<_> =
860                    ids.chunks(chunk_size)
861                        .enumerate()
862                        .map(|(index, chunk)| {
863                            let sem = semaphore.clone();
864                            let mut base =
865                                ProtocolComponentsParams::new(chain, protocol_system.as_str())
866                                    .with_component_ids(chunk.to_vec())
867                                    .with_pagination(index as i64, chunk_size as i64);
868                            if let Some(tvl) = tvl_gt {
869                                base = base.with_tvl_gt(tvl);
870                            }
871                            async move {
872                                let _permit = sem.acquire().await.map_err(|_| {
873                                    RPCError::Fatal("Semaphore dropped".to_string())
874                                })?;
875                                self.get_protocol_components(base).await
876                            }
877                        })
878                        .collect();
879
880                try_join_all(tasks)
881                    .await
882                    .map(|pages| pages.into_iter().flatten().collect())
883            }
884            None => {
885                // If no component ids are specified, we need to make requests based on the total
886                // number of results from the first response.
887                let mut base_params =
888                    ProtocolComponentsParams::new(chain, protocol_system.as_str())
889                        .with_pagination(0, chunk_size as i64);
890                if let Some(tvl) = tvl_gt {
891                    base_params = base_params.with_tvl_gt(tvl);
892                }
893
894                let first_page = self
895                    .get_protocol_components(base_params)
896                    .await?;
897
898                let total_items = first_page.total();
899                let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
900
901                let mut all: Vec<ProtocolComponent> = first_page.into_data();
902
903                let mut page = 1;
904                while page < total_pages {
905                    let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
906
907                    let tasks: Vec<_> = (0..requests_in_this_iteration)
908                        .map(|iter| {
909                            let sem = semaphore.clone();
910                            let mut p =
911                                ProtocolComponentsParams::new(chain, protocol_system.as_str())
912                                    .with_pagination(page + iter, chunk_size as i64);
913                            if let Some(tvl) = tvl_gt {
914                                p = p.with_tvl_gt(tvl);
915                            }
916                            async move {
917                                let _permit = sem.acquire().await.map_err(|_| {
918                                    RPCError::Fatal("Semaphore dropped".to_string())
919                                })?;
920                                self.get_protocol_components(p).await
921                            }
922                        })
923                        .collect();
924
925                    let responses = try_join_all(tasks).await?;
926
927                    for resp in responses {
928                        all.extend(resp);
929                    }
930
931                    page += requests_in_this_iteration;
932                }
933                Ok(all)
934            }
935        }
936    }
937
938    /// Retrieves a page of protocol component states.
939    ///
940    /// `block_number` pins the query to a specific block; pass `None` to use the latest state.
941    async fn get_protocol_states(
942        &self,
943        params: ProtocolStatesParams,
944    ) -> Result<Page<Vec<ProtocolComponentState>>, RPCError>;
945
946    /// Retrieves protocol states for a set of protocol IDs, fetching all pages automatically.
947    ///
948    /// If `chunk_size` is `None`, it defaults to the maximum page size.
949    async fn get_protocol_states_paginated(
950        &self,
951        params: ProtocolStatesPaginatedParams,
952    ) -> Result<Vec<ProtocolComponentState>, RPCError> {
953        let semaphore = Arc::new(Semaphore::new(params.concurrency));
954
955        let chunk_size =
956            params
957                .chunk_size
958                .unwrap_or(
959                    ProtocolStateRequestBody::effective_max_page_size(self.compression()) as usize
960                );
961
962        let tasks: Vec<_> = params
963            .protocol_ids
964            .chunks(chunk_size)
965            .map(|c| {
966                let sem = semaphore.clone();
967                let p = ProtocolStatesParams::new(params.chain, params.protocol_system.as_str())
968                    .with_protocol_ids(c.to_vec())
969                    .with_include_balances(params.include_balances)
970                    .with_version(params.version.clone())
971                    .with_pagination(0, chunk_size as i64);
972                async move {
973                    let _permit = sem
974                        .acquire()
975                        .await
976                        .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
977                    self.get_protocol_states(p).await
978                }
979            })
980            .collect();
981
982        try_join_all(tasks)
983            .await
984            .map(|pages| pages.into_iter().flatten().collect())
985    }
986
987    /// Retrieves a page of tokens.
988    ///
989    /// Use `get_all_tokens` to fetch all matching tokens automatically.
990    async fn get_tokens(&self, params: TokensParams) -> Result<Page<Vec<Token>>, RPCError>;
991
992    /// Retrieves all tokens matching the given criteria, fetching all pages automatically.
993    ///
994    /// If `chunk_size` is `None`, it defaults to the maximum page size.
995    async fn get_all_tokens(&self, params: AllTokensParams) -> Result<Vec<Token>, RPCError> {
996        let chunk_size = params
997            .chunk_size
998            .unwrap_or(TokensRequestBody::effective_max_page_size(self.compression()) as usize);
999
1000        let semaphore = Arc::new(Semaphore::new(params.concurrency));
1001
1002        let page_size: i64 = chunk_size.try_into().map_err(|_| {
1003            RPCError::FormatRequest("Failed to convert chunk_size into i64".to_string())
1004        })?;
1005
1006        let mut base_params = TokensParams::new(params.chain).with_pagination(0, page_size);
1007        if let Some(q) = params.min_quality {
1008            base_params = base_params.with_min_quality(q);
1009        }
1010        if let Some(d) = params.traded_n_days_ago {
1011            base_params = base_params.with_traded_n_days_ago(d);
1012        }
1013
1014        let first_page = self.get_tokens(base_params).await?;
1015        let total_pages = (first_page.total() as f64 / chunk_size as f64).ceil() as i64;
1016
1017        let mut all_tokens: Vec<Token> = first_page.into_data();
1018
1019        if total_pages <= 1 {
1020            return Ok(all_tokens);
1021        }
1022
1023        let tasks: Vec<_> = (1..total_pages)
1024            .map(|page| {
1025                let sem = semaphore.clone();
1026                let mut p = TokensParams::new(params.chain).with_pagination(page, page_size);
1027                if let Some(q) = params.min_quality {
1028                    p = p.with_min_quality(q);
1029                }
1030                if let Some(d) = params.traded_n_days_ago {
1031                    p = p.with_traded_n_days_ago(d);
1032                }
1033                async move {
1034                    let _permit = sem
1035                        .acquire()
1036                        .await
1037                        .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
1038                    self.get_tokens(p).await
1039                }
1040            })
1041            .collect();
1042
1043        let pages = try_join_all(tasks).await?;
1044        for page in pages {
1045            all_tokens.extend(page);
1046        }
1047
1048        Ok(all_tokens)
1049    }
1050
1051    /// Retrieves the protocol systems known to the server.
1052    async fn get_protocol_systems(
1053        &self,
1054        params: ProtocolSystemsParams,
1055    ) -> Result<Page<ProtocolSystems>, RPCError>;
1056
1057    /// Retrieves component TVL values.
1058    ///
1059    /// Filter by `component_ids` or by `protocol_system`; both are optional.
1060    async fn get_component_tvl(
1061        &self,
1062        params: ComponentTvlParams,
1063    ) -> Result<Page<HashMap<String, f64>>, RPCError>;
1064
1065    /// Retrieves component TVL values, fetching all pages automatically.
1066    ///
1067    /// If `chunk_size` is `None`, it defaults to the maximum page size.
1068    async fn get_component_tvl_paginated(
1069        &self,
1070        params: ComponentTvlPaginatedParams,
1071    ) -> Result<HashMap<String, f64>, RPCError> {
1072        let semaphore = Arc::new(Semaphore::new(params.concurrency));
1073
1074        let chunk_size =
1075            params
1076                .chunk_size
1077                .unwrap_or(
1078                    ComponentTvlRequestBody::effective_max_page_size(self.compression()) as usize
1079                );
1080
1081        match params.component_ids {
1082            Some(ids) => {
1083                let tasks: Vec<_> =
1084                    ids.chunks(chunk_size)
1085                        .enumerate()
1086                        .map(|(index, chunk)| {
1087                            let sem = semaphore.clone();
1088                            let mut p = ComponentTvlParams::new(params.chain)
1089                                .with_component_ids(chunk.to_vec())
1090                                .with_pagination(index as i64, chunk_size as i64);
1091                            if let Some(ref ps) = params.protocol_system {
1092                                p = p.with_protocol_system(ps.as_str());
1093                            }
1094                            async move {
1095                                let _permit = sem.acquire().await.map_err(|_| {
1096                                    RPCError::Fatal("Semaphore dropped".to_string())
1097                                })?;
1098                                self.get_component_tvl(p).await
1099                            }
1100                        })
1101                        .collect();
1102
1103                let pages = try_join_all(tasks).await?;
1104
1105                let mut merged_tvl = HashMap::new();
1106                for page in pages {
1107                    for (key, value) in page {
1108                        *merged_tvl.entry(key).or_insert(0.0) = value;
1109                    }
1110                }
1111
1112                Ok(merged_tvl)
1113            }
1114            None => {
1115                let mut base =
1116                    ComponentTvlParams::new(params.chain).with_pagination(0, chunk_size as i64);
1117                if let Some(ref ps) = params.protocol_system {
1118                    base = base.with_protocol_system(ps.as_str());
1119                }
1120
1121                let first_page = self.get_component_tvl(base).await?;
1122                let total_items = first_page.total();
1123                let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
1124
1125                let mut merged_tvl: HashMap<String, f64> = first_page.into_data();
1126
1127                let mut page = 1;
1128                while page < total_pages {
1129                    let requests_in_this_iteration =
1130                        (total_pages - page).min(params.concurrency as i64);
1131
1132                    let tasks: Vec<_> = (0..requests_in_this_iteration)
1133                        .map(|i| {
1134                            let sem = semaphore.clone();
1135                            let mut p = ComponentTvlParams::new(params.chain)
1136                                .with_pagination(page + i, chunk_size as i64);
1137                            if let Some(ref ps) = params.protocol_system {
1138                                p = p.with_protocol_system(ps.as_str());
1139                            }
1140                            async move {
1141                                let _permit = sem.acquire().await.map_err(|_| {
1142                                    RPCError::Fatal("Semaphore dropped".to_string())
1143                                })?;
1144                                self.get_component_tvl(p).await
1145                            }
1146                        })
1147                        .collect();
1148
1149                    let responses = try_join_all(tasks).await?;
1150
1151                    for resp in responses {
1152                        for (key, value) in resp {
1153                            *merged_tvl.entry(key).or_insert(0.0) = value;
1154                        }
1155                    }
1156
1157                    page += requests_in_this_iteration;
1158                }
1159
1160                Ok(merged_tvl)
1161            }
1162        }
1163    }
1164
1165    /// Retrieves a page of traced entry points.
1166    ///
1167    /// Use `get_traced_entry_points_paginated` to fetch all pages automatically.
1168    async fn get_traced_entry_points(
1169        &self,
1170        params: TracedEntryPointsParams,
1171    ) -> Result<Page<TracedEntryPoints>, RPCError>;
1172
1173    /// Retrieves traced entry points for a set of component IDs, fetching all pages automatically.
1174    ///
1175    /// If `chunk_size` is `None`, it defaults to the maximum page size.
1176    async fn get_traced_entry_points_paginated(
1177        &self,
1178        params: TracedEntryPointsPaginatedParams,
1179    ) -> Result<TracedEntryPoints, RPCError> {
1180        let chain = params.chain;
1181        let protocol_system = params.protocol_system;
1182        let component_ids = params.component_ids;
1183        let chunk_size = params.chunk_size;
1184        let concurrency = params.concurrency;
1185
1186        let semaphore = Arc::new(Semaphore::new(concurrency));
1187
1188        let chunk_size = chunk_size.unwrap_or(
1189            TracedEntryPointRequestBody::effective_max_page_size(self.compression()) as usize,
1190        );
1191
1192        let tasks: Vec<_> = component_ids
1193            .chunks(chunk_size)
1194            .map(|c| {
1195                let sem = semaphore.clone();
1196                let params = TracedEntryPointsParams::new(chain, protocol_system.as_str())
1197                    .with_component_ids(c.to_vec())
1198                    .with_pagination(0, chunk_size as i64);
1199                async move {
1200                    let _permit = sem
1201                        .acquire()
1202                        .await
1203                        .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
1204                    self.get_traced_entry_points(params)
1205                        .await
1206                }
1207            })
1208            .collect();
1209
1210        try_join_all(tasks)
1211            .await
1212            .map(|pages| pages.into_iter().flatten().collect())
1213    }
1214
1215    async fn get_snapshots<'a>(
1216        &self,
1217        request: &SnapshotParameters<'a>,
1218        chunk_size: Option<usize>,
1219        concurrency: usize,
1220    ) -> Result<Snapshot, RPCError>;
1221}
1222
1223/// Configuration options for HttpRPCClient
1224#[derive(Debug, Clone)]
1225pub struct HttpRPCClientOptions {
1226    /// Optional API key for authentication
1227    pub auth_key: Option<String>,
1228    /// Enable compression for requests (default: true)
1229    /// When enabled, adds Accept-Encoding: zstd header
1230    pub compression: bool,
1231}
1232
1233impl Default for HttpRPCClientOptions {
1234    fn default() -> Self {
1235        Self::new()
1236    }
1237}
1238
1239impl HttpRPCClientOptions {
1240    /// Create new options with default values (compression enabled)
1241    pub fn new() -> Self {
1242        Self { auth_key: None, compression: true }
1243    }
1244
1245    /// Set the authentication key
1246    pub fn with_auth_key(mut self, auth_key: Option<String>) -> Self {
1247        self.auth_key = auth_key;
1248        self
1249    }
1250
1251    /// Set whether to enable compression (default: true)
1252    pub fn with_compression(mut self, compression: bool) -> Self {
1253        self.compression = compression;
1254        self
1255    }
1256}
1257
1258#[derive(Debug, Clone)]
1259pub struct HttpRPCClient {
1260    http_client: Client,
1261    url: Url,
1262    retry_after: Arc<RwLock<Option<SystemTime>>>,
1263    backoff_policy: ExponentialBackoff,
1264    server_restart_duration: Duration,
1265    compression: bool,
1266}
1267
1268impl HttpRPCClient {
1269    pub fn new(base_uri: &str, options: HttpRPCClientOptions) -> Result<Self, RPCError> {
1270        let uri = base_uri
1271            .parse::<Url>()
1272            .map_err(|e| RPCError::UrlParsing(base_uri.to_string(), e.to_string()))?;
1273
1274        // Add default headers
1275        let mut headers = header::HeaderMap::new();
1276        headers.insert(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"));
1277        let user_agent = format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION"));
1278        headers.insert(
1279            header::USER_AGENT,
1280            header::HeaderValue::from_str(&user_agent)
1281                .map_err(|e| RPCError::FormatRequest(format!("Invalid user agent format: {e}")))?,
1282        );
1283
1284        // Add Authorization if one is given
1285        if let Some(key) = options.auth_key.as_deref() {
1286            let mut auth_value = header::HeaderValue::from_str(key).map_err(|e| {
1287                RPCError::FormatRequest(format!("Invalid authorization key format: {e}"))
1288            })?;
1289            auth_value.set_sensitive(true);
1290            headers.insert(header::AUTHORIZATION, auth_value);
1291        }
1292
1293        let mut client_builder = ClientBuilder::new()
1294            .default_headers(headers)
1295            .http2_prior_knowledge();
1296
1297        // When compression is disabled, turn off all automatic compression
1298        if !options.compression {
1299            client_builder = client_builder.no_zstd();
1300        }
1301
1302        let client = client_builder
1303            .build()
1304            .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
1305
1306        Ok(Self {
1307            http_client: client,
1308            url: uri,
1309            retry_after: Arc::new(RwLock::new(None)),
1310            backoff_policy: ExponentialBackoffBuilder::new()
1311                .with_initial_interval(Duration::from_millis(250))
1312                // increase backoff time by 75% each failure
1313                .with_multiplier(1.75)
1314                // keep retrying every 30s
1315                .with_max_interval(Duration::from_secs(30))
1316                // if all retries take longer than 2m, give up
1317                .with_max_elapsed_time(Some(Duration::from_secs(125)))
1318                .build(),
1319            server_restart_duration: Duration::from_secs(120),
1320            compression: options.compression,
1321        })
1322    }
1323
1324    #[cfg(test)]
1325    pub fn with_test_backoff_policy(mut self) -> Self {
1326        // Extremely short intervals for very fast testing
1327        self.backoff_policy = ExponentialBackoffBuilder::new()
1328            .with_initial_interval(Duration::from_millis(1))
1329            .with_multiplier(1.1)
1330            .with_max_interval(Duration::from_millis(5))
1331            .with_max_elapsed_time(Some(Duration::from_millis(50)))
1332            .build();
1333        self.server_restart_duration = Duration::from_millis(50);
1334        self
1335    }
1336
1337    /// Converts a error response to a Result.
1338    ///
1339    /// Raises an error if the response status code id 429, 502, 503 or 504. In the 429
1340    /// case it will try to look for a retry-after header an parse it accordingly. The
1341    /// parsed value is then passed as part of the error.
1342    async fn error_for_response(
1343        &self,
1344        response: reqwest::Response,
1345    ) -> Result<reqwest::Response, RPCError> {
1346        match response.status() {
1347            StatusCode::TOO_MANY_REQUESTS => {
1348                let retry_after_raw = response
1349                    .headers()
1350                    .get(reqwest::header::RETRY_AFTER)
1351                    .and_then(|h| h.to_str().ok())
1352                    .and_then(parse_retry_value);
1353
1354                let reason = response
1355                    .text()
1356                    .await
1357                    .unwrap_or_default();
1358                warn!(reason, retry_after = ?retry_after_raw, "Rate limited by server");
1359
1360                Err(RPCError::RateLimited(retry_after_raw))
1361            }
1362            StatusCode::BAD_GATEWAY |
1363            StatusCode::SERVICE_UNAVAILABLE |
1364            StatusCode::GATEWAY_TIMEOUT => Err(RPCError::ServerUnreachable(
1365                response
1366                    .text()
1367                    .await
1368                    .unwrap_or_else(|_| "Server Unreachable".to_string()),
1369            )),
1370            _ => Ok(response),
1371        }
1372    }
1373
1374    /// Classifies errors into transient or permanent ones.
1375    ///
1376    /// Transient errors are retried with a potential backoff, permanent ones are not.
1377    /// If the error is RateLimited, this method will set the self.retry_after value so
1378    /// future requests wait until the rate limit has been reset.
1379    async fn handle_error_for_backoff(&self, e: RPCError) -> backoff::Error<RPCError> {
1380        match e {
1381            RPCError::ServerUnreachable(_) => {
1382                backoff::Error::retry_after(e, self.server_restart_duration)
1383            }
1384            RPCError::RateLimited(Some(until)) => {
1385                let mut retry_after_guard = self.retry_after.write().await;
1386                *retry_after_guard = Some(
1387                    retry_after_guard
1388                        .unwrap_or(until)
1389                        .max(until),
1390                );
1391
1392                if let Ok(duration) = until.duration_since(SystemTime::now()) {
1393                    backoff::Error::retry_after(e, duration)
1394                } else {
1395                    e.into()
1396                }
1397            }
1398            RPCError::RateLimited(None) => e.into(),
1399            _ => backoff::Error::permanent(e),
1400        }
1401    }
1402
1403    /// Waits until the current rate limit time has passed.
1404    ///
1405    /// Only waits if there is a time and that time is in the future, else return
1406    /// immediately.
1407    async fn wait_until_retry_after(&self) {
1408        if let Some(&until) = self.retry_after.read().await.as_ref() {
1409            let now = SystemTime::now();
1410            if until > now {
1411                if let Ok(duration) = until.duration_since(now) {
1412                    sleep(duration).await
1413                }
1414            }
1415        }
1416    }
1417
1418    /// Makes a post request handling transient failures.
1419    ///
1420    /// If a retry-after header is received it will be respected. Else the configured
1421    /// backoff policy is used to deal with transient network or server errors.
1422    async fn make_post_request<T: Serialize + ?Sized>(
1423        &self,
1424        request: &T,
1425        uri: &String,
1426    ) -> Result<Response, RPCError> {
1427        self.wait_until_retry_after().await;
1428        let response = backoff::future::retry(self.backoff_policy.clone(), || async {
1429            let server_response = self
1430                .http_client
1431                .post(uri)
1432                .json(request)
1433                .send()
1434                .await
1435                .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
1436
1437            match self
1438                .error_for_response(server_response)
1439                .await
1440            {
1441                Ok(response) => Ok(response),
1442                Err(e) => Err(self.handle_error_for_backoff(e).await),
1443            }
1444        })
1445        .await?;
1446        Ok(response)
1447    }
1448}
1449
1450fn parse_retry_value(val: &str) -> Option<SystemTime> {
1451    if let Ok(secs) = val.parse::<u64>() {
1452        return Some(SystemTime::now() + Duration::from_secs(secs));
1453    }
1454    if let Ok(date) = OffsetDateTime::parse(val, &Rfc2822) {
1455        return Some(date.into());
1456    }
1457    None
1458}
1459
1460#[async_trait]
1461impl RPCClient for HttpRPCClient {
1462    fn compression(&self) -> bool {
1463        self.compression
1464    }
1465
1466    #[instrument(skip(self))]
1467    async fn get_contract_state(
1468        &self,
1469        params: ContractStateParams,
1470    ) -> Result<Page<Vec<Account>>, RPCError> {
1471        if params
1472            .contract_ids
1473            .as_ref()
1474            .is_none_or(|ids| ids.is_empty())
1475        {
1476            warn!("No contract ids specified in request.");
1477        }
1478
1479        let request = StateRequestBody {
1480            contract_ids: params.contract_ids,
1481            protocol_system: params.protocol_system,
1482            chain: params.chain.into(),
1483            version: params.version,
1484            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1485        };
1486
1487        let uri = format!(
1488            "{}/{}/contract_state",
1489            self.url
1490                .to_string()
1491                .trim_end_matches('/'),
1492            TYCHO_SERVER_VERSION
1493        );
1494        debug!(%uri, "Sending contract_state request to Tycho server");
1495        trace!(?request, "Sending request to Tycho server");
1496        let response = self
1497            .make_post_request(&request, &uri)
1498            .await?;
1499        trace!(?response, "Received response from Tycho server");
1500
1501        let body = response
1502            .text()
1503            .await
1504            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1505        if body.is_empty() {
1506            // Pure native protocols will return empty contract states
1507            return Ok(Page::new(vec![], 0, 0, 0));
1508        }
1509
1510        let dto_response = serde_json::from_str::<StateRequestResponse>(&body)
1511            .map_err(|err| RPCError::from_parse_error(err, &body))?;
1512        trace!(?dto_response, "Received contract_state response from Tycho server");
1513
1514        let data: Vec<Account> = dto_response
1515            .accounts
1516            .into_iter()
1517            .map(Account::from)
1518            .collect();
1519        Ok(Page::new(
1520            data,
1521            dto_response.pagination.total,
1522            dto_response.pagination.page,
1523            dto_response.pagination.page_size,
1524        ))
1525    }
1526
1527    async fn get_protocol_components(
1528        &self,
1529        params: ProtocolComponentsParams,
1530    ) -> Result<Page<Vec<ProtocolComponent>>, RPCError> {
1531        let request = ProtocolComponentsRequestBody {
1532            protocol_system: params.protocol_system,
1533            component_ids: params.component_ids,
1534            tvl_gt: params.tvl_gt,
1535            chain: params.chain.into(),
1536            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1537        };
1538
1539        let uri = format!(
1540            "{}/{}/protocol_components",
1541            self.url
1542                .to_string()
1543                .trim_end_matches('/'),
1544            TYCHO_SERVER_VERSION,
1545        );
1546        debug!(%uri, "Sending protocol_components request to Tycho server");
1547        trace!(?request, "Sending request to Tycho server");
1548
1549        let response = self
1550            .make_post_request(&request, &uri)
1551            .await?;
1552
1553        trace!(?response, "Received response from Tycho server");
1554
1555        let body = response
1556            .text()
1557            .await
1558            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1559        let dto_response = serde_json::from_str::<ProtocolComponentRequestResponse>(&body)
1560            .map_err(|err| RPCError::from_parse_error(err, &body))?;
1561        trace!(?dto_response, "Received protocol_components response from Tycho server");
1562
1563        let data: Vec<ProtocolComponent> = dto_response
1564            .protocol_components
1565            .into_iter()
1566            .map(ProtocolComponent::from)
1567            .collect();
1568        Ok(Page::new(
1569            data,
1570            dto_response.pagination.total,
1571            dto_response.pagination.page,
1572            dto_response.pagination.page_size,
1573        ))
1574    }
1575
1576    async fn get_protocol_states(
1577        &self,
1578        params: ProtocolStatesParams,
1579    ) -> Result<Page<Vec<ProtocolComponentState>>, RPCError> {
1580        if params
1581            .protocol_ids
1582            .as_ref()
1583            .is_none_or(|ids| ids.is_empty())
1584        {
1585            warn!("No protocol ids specified in request.");
1586        }
1587
1588        let request = ProtocolStateRequestBody {
1589            protocol_ids: params.protocol_ids,
1590            protocol_system: params.protocol_system,
1591            chain: params.chain.into(),
1592            include_balances: params.include_balances,
1593            version: params.version,
1594            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1595        };
1596
1597        let uri = format!(
1598            "{}/{}/protocol_state",
1599            self.url
1600                .to_string()
1601                .trim_end_matches('/'),
1602            TYCHO_SERVER_VERSION
1603        );
1604        debug!(%uri, "Sending protocol_states request to Tycho server");
1605        trace!(?request, "Sending request to Tycho server");
1606
1607        let response = self
1608            .make_post_request(&request, &uri)
1609            .await?;
1610        trace!(?response, "Received response from Tycho server");
1611
1612        let body = response
1613            .text()
1614            .await
1615            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1616
1617        if body.is_empty() {
1618            // Pure VM protocols will return empty states
1619            return Ok(Page::new(vec![], 0, 0, 0));
1620        }
1621
1622        let dto_response = serde_json::from_str::<ProtocolStateRequestResponse>(&body)
1623            .map_err(|err| RPCError::from_parse_error(err, &body))?;
1624        trace!(?dto_response, "Received protocol_states response from Tycho server");
1625
1626        let data: Vec<ProtocolComponentState> = dto_response
1627            .states
1628            .into_iter()
1629            .map(ProtocolComponentState::from)
1630            .collect();
1631        Ok(Page::new(
1632            data,
1633            dto_response.pagination.total,
1634            dto_response.pagination.page,
1635            dto_response.pagination.page_size,
1636        ))
1637    }
1638
1639    async fn get_tokens(&self, params: TokensParams) -> Result<Page<Vec<Token>>, RPCError> {
1640        let request = TokensRequestBody {
1641            token_addresses: None,
1642            min_quality: params.min_quality,
1643            traded_n_days_ago: params.traded_n_days_ago,
1644            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1645            chain: params.chain.into(),
1646        };
1647
1648        let uri = format!(
1649            "{}/{}/tokens",
1650            self.url
1651                .to_string()
1652                .trim_end_matches('/'),
1653            TYCHO_SERVER_VERSION
1654        );
1655        debug!(%uri, "Sending tokens request to Tycho server");
1656
1657        let response = self
1658            .make_post_request(&request, &uri)
1659            .await?;
1660
1661        let body = response
1662            .text()
1663            .await
1664            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1665        let dto_response = serde_json::from_str::<TokensRequestResponse>(&body)
1666            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1667
1668        let data: Vec<Token> = dto_response
1669            .tokens
1670            .into_iter()
1671            .map(Token::from)
1672            .collect();
1673        Ok(Page::new(
1674            data,
1675            dto_response.pagination.total,
1676            dto_response.pagination.page,
1677            dto_response.pagination.page_size,
1678        ))
1679    }
1680
1681    async fn get_protocol_systems(
1682        &self,
1683        params: ProtocolSystemsParams,
1684    ) -> Result<Page<ProtocolSystems>, RPCError> {
1685        let request = ProtocolSystemsRequestBody {
1686            chain: params.chain.into(),
1687            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1688        };
1689
1690        let uri = format!(
1691            "{}/{}/protocol_systems",
1692            self.url
1693                .to_string()
1694                .trim_end_matches('/'),
1695            TYCHO_SERVER_VERSION
1696        );
1697        debug!(%uri, "Sending protocol_systems request to Tycho server");
1698        trace!(?request, "Sending request to Tycho server");
1699        let response = self
1700            .make_post_request(&request, &uri)
1701            .await?;
1702        trace!(?response, "Received response from Tycho server");
1703        let body = response
1704            .text()
1705            .await
1706            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1707        let dto = serde_json::from_str::<ProtocolSystemsRequestResponse>(&body)
1708            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1709        trace!(?dto, "Received protocol_systems response from Tycho server");
1710        Ok(Page::new(
1711            ProtocolSystems::new(dto.protocol_systems, dto.dci_protocols),
1712            dto.pagination.total,
1713            dto.pagination.page,
1714            dto.pagination.page_size,
1715        ))
1716    }
1717
1718    async fn get_component_tvl(
1719        &self,
1720        params: ComponentTvlParams,
1721    ) -> Result<Page<HashMap<String, f64>>, RPCError> {
1722        let request = ComponentTvlRequestBody {
1723            chain: params.chain.into(),
1724            protocol_system: params.protocol_system,
1725            component_ids: params.component_ids,
1726            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1727        };
1728
1729        let uri = format!(
1730            "{}/{}/component_tvl",
1731            self.url
1732                .to_string()
1733                .trim_end_matches('/'),
1734            TYCHO_SERVER_VERSION
1735        );
1736        debug!(%uri, "Sending get_component_tvl request to Tycho server");
1737        trace!(?request, "Sending request to Tycho server");
1738        let response = self
1739            .make_post_request(&request, &uri)
1740            .await?;
1741        trace!(?response, "Received response from Tycho server");
1742        let body = response
1743            .text()
1744            .await
1745            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1746        let dto_response =
1747            serde_json::from_str::<ComponentTvlRequestResponse>(&body).map_err(|err| {
1748                error!("Failed to parse component_tvl response: {:?}", &body);
1749                RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1750            })?;
1751        trace!(?dto_response, "Received component_tvl response from Tycho server");
1752        Ok(Page::new(
1753            dto_response.tvl,
1754            dto_response.pagination.total,
1755            dto_response.pagination.page,
1756            dto_response.pagination.page_size,
1757        ))
1758    }
1759
1760    async fn get_traced_entry_points(
1761        &self,
1762        params: TracedEntryPointsParams,
1763    ) -> Result<Page<TracedEntryPoints>, RPCError> {
1764        let request = TracedEntryPointRequestBody {
1765            chain: params.chain.into(),
1766            protocol_system: params.protocol_system,
1767            component_ids: params.component_ids,
1768            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1769        };
1770
1771        let uri = format!(
1772            "{}/{TYCHO_SERVER_VERSION}/traced_entry_points",
1773            self.url
1774                .to_string()
1775                .trim_end_matches('/')
1776        );
1777        debug!(%uri, "Sending traced_entry_points request to Tycho server");
1778        trace!(?request, "Sending request to Tycho server");
1779
1780        let response = self
1781            .make_post_request(&request, &uri)
1782            .await?;
1783
1784        trace!(?response, "Received response from Tycho server");
1785
1786        let body = response
1787            .text()
1788            .await
1789            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1790        let dto_response =
1791            serde_json::from_str::<TracedEntryPointRequestResponse>(&body).map_err(|err| {
1792                error!("Failed to parse traced_entry_points response: {:?}", &body);
1793                RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1794            })?;
1795        trace!(?dto_response, "Received traced_entry_points response from Tycho server");
1796        let data: TracedEntryPoints = dto_response
1797            .traced_entry_points
1798            .into_iter()
1799            .map(|(k, v)| {
1800                (
1801                    k,
1802                    v.into_iter()
1803                        .map(|(ep, tr)| {
1804                            (EntryPointWithTracingParams::from(ep), TracingResult::from(tr))
1805                        })
1806                        .collect(),
1807                )
1808            })
1809            .collect();
1810        Ok(Page::new(
1811            data,
1812            dto_response.pagination.total,
1813            dto_response.pagination.page,
1814            dto_response.pagination.page_size,
1815        ))
1816    }
1817
1818    async fn get_snapshots<'a>(
1819        &self,
1820        request: &SnapshotParameters<'a>,
1821        chunk_size: Option<usize>,
1822        concurrency: usize,
1823    ) -> Result<Snapshot, RPCError> {
1824        let component_ids: Vec<_> = request
1825            .components
1826            .keys()
1827            .cloned()
1828            .collect();
1829
1830        let component_tvl = if request.include_tvl && !component_ids.is_empty() {
1831            self.get_component_tvl_paginated(
1832                ComponentTvlPaginatedParams::new(request.chain, concurrency)
1833                    .with_component_ids(component_ids.clone()),
1834            )
1835            .await?
1836        } else {
1837            HashMap::new()
1838        };
1839
1840        let version = VersionParam::at_block(request.chain.into(), request.block_number);
1841
1842        let mut protocol_states = if !component_ids.is_empty() {
1843            self.get_protocol_states_paginated(
1844                ProtocolStatesPaginatedParams::new(
1845                    request.chain,
1846                    request.protocol_system,
1847                    concurrency,
1848                )
1849                .with_protocol_ids(component_ids.clone())
1850                .with_include_balances(request.include_balances)
1851                .with_version(version.clone()),
1852            )
1853            .await?
1854            .into_iter()
1855            .map(|state| (state.component_id.clone(), state))
1856            .collect()
1857        } else {
1858            HashMap::new()
1859        };
1860
1861        // Convert to ComponentWithState, which includes entrypoint information.
1862        let states = request
1863            .components
1864            .values()
1865            .filter_map(|component| {
1866                if let Some(state) = protocol_states.remove(&component.id) {
1867                    Some((
1868                        component.id.clone(),
1869                        ComponentWithState {
1870                            state,
1871                            component: component.clone(),
1872                            component_tvl: component_tvl
1873                                .get(&component.id)
1874                                .cloned(),
1875                            entrypoints: request
1876                                .entrypoints
1877                                .as_ref()
1878                                .and_then(|map| map.get(&component.id))
1879                                .cloned()
1880                                .unwrap_or_default(),
1881                        },
1882                    ))
1883                } else if component_ids.contains(&component.id) {
1884                    // only emit error event if we requested this component
1885                    let component_id = &component.id;
1886                    error!(?component_id, "Missing state for native component!");
1887                    None
1888                } else {
1889                    None
1890                }
1891            })
1892            .collect();
1893
1894        let vm_storage = if !request.contract_ids.is_empty() {
1895            let mut cp_params = ContractStatePaginatedParams::new(
1896                request.chain,
1897                request.protocol_system,
1898                concurrency,
1899            )
1900            .with_contract_ids(request.contract_ids.to_vec())
1901            .with_version(version.clone());
1902            if let Some(cs) = chunk_size {
1903                cp_params = cp_params.with_chunk_size(cs);
1904            }
1905            let contract_states = self
1906                .get_contract_state_paginated(cp_params)
1907                .await?
1908                .into_iter()
1909                .map(|acc| (acc.address.clone(), acc))
1910                .collect::<HashMap<_, _>>();
1911
1912            trace!(states=?&contract_states, "Retrieved ContractState");
1913
1914            let contract_address_to_components = request
1915                .components
1916                .iter()
1917                .filter_map(|(id, comp)| {
1918                    if component_ids.contains(id) {
1919                        Some(
1920                            comp.contract_addresses
1921                                .iter()
1922                                .map(|address| (address.clone(), comp.id.clone())),
1923                        )
1924                    } else {
1925                        None
1926                    }
1927                })
1928                .flatten()
1929                .fold(HashMap::<Bytes, Vec<String>>::new(), |mut acc, (addr, c_id)| {
1930                    acc.entry(addr).or_default().push(c_id);
1931                    acc
1932                });
1933
1934            request
1935                .contract_ids
1936                .iter()
1937                .filter_map(|address| {
1938                    if let Some(state) = contract_states.get(address) {
1939                        Some((address.clone(), state.clone()))
1940                    } else if let Some(ids) = contract_address_to_components.get(address) {
1941                        // only emit error even if we did actually request this address
1942                        error!(
1943                            ?address,
1944                            ?ids,
1945                            "Component with lacking contract storage encountered!"
1946                        );
1947                        None
1948                    } else {
1949                        None
1950                    }
1951                })
1952                .collect()
1953        } else {
1954            HashMap::new()
1955        };
1956
1957        Ok(Snapshot { states, vm_storage })
1958    }
1959}
1960
1961#[cfg(test)]
1962mod tests {
1963    use std::{
1964        collections::{HashMap, HashSet},
1965        str::FromStr,
1966    };
1967
1968    use mockito::Server;
1969    use rstest::rstest;
1970    use tycho_common::models::blockchain::AddressStorageLocation;
1971
1972    use super::*;
1973
1974    // Dummy implementation of `get_protocol_states_paginated` for backwards compatibility testing
1975    // purposes
1976    impl MockRPCClient {
1977        #[allow(clippy::too_many_arguments)]
1978        async fn test_get_protocol_states_paginated<T>(
1979            &self,
1980            chain: Chain,
1981            ids: &[T],
1982            protocol_system: &str,
1983            include_balances: bool,
1984            block_number: Option<u64>,
1985            chunk_size: usize,
1986            _concurrency: usize,
1987        ) -> Vec<(Chain, Vec<String>, String, bool, Option<u64>, PaginationParams)>
1988        where
1989            T: AsRef<str> + Clone + Send + Sync + 'static,
1990        {
1991            ids.chunks(chunk_size)
1992                .map(|chunk| {
1993                    (
1994                        chain,
1995                        chunk
1996                            .iter()
1997                            .map(|id| id.as_ref().to_string())
1998                            .collect(),
1999                        protocol_system.to_string(),
2000                        include_balances,
2001                        block_number,
2002                        PaginationParams { page: 0, page_size: chunk_size as i64 },
2003                    )
2004                })
2005                .collect()
2006        }
2007    }
2008
2009    const GET_CONTRACT_STATE_RESP: &str = r#"
2010        {
2011            "accounts": [
2012                {
2013                    "chain": "ethereum",
2014                    "address": "0x0000000000000000000000000000000000000000",
2015                    "title": "",
2016                    "slots": {},
2017                    "native_balance": "0x01f4",
2018                    "token_balances": {},
2019                    "code": "0x00",
2020                    "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
2021                    "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2022                    "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2023                    "creation_tx": null
2024                }
2025            ],
2026            "pagination": {
2027                "page": 0,
2028                "page_size": 20,
2029                "total": 10
2030            }
2031        }
2032        "#;
2033
2034    #[rstest]
2035    #[case::string_input(vec![
2036        "id1".to_string(),
2037        "id2".to_string()
2038    ])]
2039    #[tokio::test]
2040    async fn test_get_protocol_states_paginated<T>(#[case] ids: Vec<T>)
2041    where
2042        T: AsRef<str> + Clone + Send + Sync + 'static,
2043    {
2044        let mock_client = MockRPCClient::new();
2045
2046        let request_args = mock_client
2047            .test_get_protocol_states_paginated(
2048                Chain::Ethereum,
2049                &ids,
2050                "test_system",
2051                true,
2052                None,
2053                2,
2054                2,
2055            )
2056            .await;
2057
2058        // Verify that the request args have been split into chunks correctly
2059        assert_eq!(request_args.len(), 1);
2060        assert_eq!(request_args[0].1.len(), 2);
2061    }
2062
2063    #[tokio::test]
2064    async fn test_get_contract_state() {
2065        let mut server = Server::new_async().await;
2066        let server_resp = GET_CONTRACT_STATE_RESP;
2067        // test that the response is deserialized correctly
2068        serde_json::from_str::<StateRequestResponse>(server_resp).expect("deserialize");
2069
2070        let mocked_server = server
2071            .mock("POST", "/v1/contract_state")
2072            .expect(1)
2073            .with_body(server_resp)
2074            .create_async()
2075            .await;
2076
2077        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2078            .expect("create client");
2079
2080        let accounts = client
2081            .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
2082            .await
2083            .expect("get state");
2084
2085        mocked_server.assert();
2086        assert_eq!(accounts.data().len(), 1);
2087        assert_eq!(accounts.data()[0].slots, HashMap::new());
2088        assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
2089        assert_eq!(accounts.data()[0].code, [0].to_vec());
2090        assert_eq!(
2091            accounts.data()[0].code_hash,
2092            hex::decode("5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e")
2093                .unwrap()
2094        );
2095    }
2096
2097    #[tokio::test]
2098    async fn test_get_protocol_components() {
2099        let mut server = Server::new_async().await;
2100        let server_resp = r#"
2101        {
2102            "protocol_components": [
2103                {
2104                    "id": "State1",
2105                    "protocol_system": "ambient",
2106                    "protocol_type_name": "Pool",
2107                    "chain": "ethereum",
2108                    "tokens": [
2109                        "0x0000000000000000000000000000000000000000",
2110                        "0x0000000000000000000000000000000000000001"
2111                    ],
2112                    "contract_ids": [
2113                        "0x0000000000000000000000000000000000000000"
2114                    ],
2115                    "static_attributes": {
2116                        "attribute_1": "0x00000000000003e8"
2117                    },
2118                    "change": "Creation",
2119                    "creation_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2120                    "created_at": "2022-01-01T00:00:00"
2121                }
2122            ],
2123            "pagination": {
2124                "page": 0,
2125                "page_size": 20,
2126                "total": 10
2127            }
2128        }
2129        "#;
2130        // test that the response is deserialized correctly
2131        serde_json::from_str::<ProtocolComponentRequestResponse>(server_resp).expect("deserialize");
2132
2133        let mocked_server = server
2134            .mock("POST", "/v1/protocol_components")
2135            .expect(1)
2136            .with_body(server_resp)
2137            .create_async()
2138            .await;
2139
2140        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2141            .expect("create client");
2142
2143        let components = client
2144            .get_protocol_components(ProtocolComponentsParams::new(Chain::Ethereum, ""))
2145            .await
2146            .expect("get state");
2147
2148        mocked_server.assert();
2149        assert_eq!(components.data().len(), 1);
2150        assert_eq!(components.data()[0].id, "State1");
2151        assert_eq!(components.data()[0].protocol_system, "ambient");
2152        assert_eq!(components.data()[0].protocol_type_name, "Pool");
2153        assert_eq!(components.data()[0].tokens.len(), 2);
2154        let expected_attributes =
2155            [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
2156                .iter()
2157                .cloned()
2158                .collect::<HashMap<String, Bytes>>();
2159        assert_eq!(components.data()[0].static_attributes, expected_attributes);
2160    }
2161
2162    #[tokio::test]
2163    async fn test_get_protocol_states() {
2164        let mut server = Server::new_async().await;
2165        let server_resp = r#"
2166        {
2167            "states": [
2168                {
2169                    "component_id": "State1",
2170                    "attributes": {
2171                        "attribute_1": "0x00000000000003e8"
2172                    },
2173                    "balances": {
2174                        "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
2175                    }
2176                }
2177            ],
2178            "pagination": {
2179                "page": 0,
2180                "page_size": 20,
2181                "total": 10
2182            }
2183        }
2184        "#;
2185        // test that the response is deserialized correctly
2186        serde_json::from_str::<ProtocolStateRequestResponse>(server_resp).expect("deserialize");
2187
2188        let mocked_server = server
2189            .mock("POST", "/v1/protocol_state")
2190            .expect(1)
2191            .with_body(server_resp)
2192            .create_async()
2193            .await;
2194        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2195            .expect("create client");
2196
2197        let states = client
2198            .get_protocol_states(
2199                ProtocolStatesParams::new(Chain::Ethereum, "").with_include_balances(true),
2200            )
2201            .await
2202            .expect("get state");
2203
2204        mocked_server.assert();
2205        assert_eq!(states.data().len(), 1);
2206        assert_eq!(states.data()[0].component_id, "State1");
2207        let expected_attributes =
2208            [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
2209                .iter()
2210                .cloned()
2211                .collect::<HashMap<String, Bytes>>();
2212        assert_eq!(states.data()[0].attributes, expected_attributes);
2213        let expected_balances = [(
2214            Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")
2215                .expect("Unsupported address format"),
2216            Bytes::from_str("0x01f4").unwrap(),
2217        )]
2218        .iter()
2219        .cloned()
2220        .collect::<HashMap<Bytes, Bytes>>();
2221        assert_eq!(states.data()[0].balances, expected_balances);
2222    }
2223
2224    #[tokio::test]
2225    async fn test_get_tokens() {
2226        let mut server = Server::new_async().await;
2227        let server_resp = r#"
2228        {
2229            "tokens": [
2230              {
2231                "chain": "ethereum",
2232                "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
2233                "symbol": "WETH",
2234                "decimals": 18,
2235                "tax": 0,
2236                "gas": [
2237                  29962
2238                ],
2239                "quality": 100
2240              },
2241              {
2242                "chain": "ethereum",
2243                "address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
2244                "symbol": "USDC",
2245                "decimals": 6,
2246                "tax": 0,
2247                "gas": [
2248                  40652
2249                ],
2250                "quality": 100
2251              }
2252            ],
2253            "pagination": {
2254              "page": 0,
2255              "page_size": 20,
2256              "total": 10
2257            }
2258          }
2259        "#;
2260        // test that the response is deserialized correctly
2261        serde_json::from_str::<TokensRequestResponse>(server_resp).expect("deserialize");
2262
2263        let mocked_server = server
2264            .mock("POST", "/v1/tokens")
2265            .expect(1)
2266            .with_body(server_resp)
2267            .create_async()
2268            .await;
2269        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2270            .expect("create client");
2271
2272        let tokens = client
2273            .get_tokens(TokensParams::new(Chain::Ethereum))
2274            .await
2275            .expect("get tokens");
2276
2277        let expected = vec![
2278            Token {
2279                chain: tycho_common::models::Chain::Ethereum,
2280                address: Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap(),
2281                symbol: "WETH".to_string(),
2282                decimals: 18,
2283                tax: 0,
2284                gas: vec![Some(29962)],
2285                quality: 100,
2286            },
2287            Token {
2288                chain: tycho_common::models::Chain::Ethereum,
2289                address: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
2290                symbol: "USDC".to_string(),
2291                decimals: 6,
2292                tax: 0,
2293                gas: vec![Some(40652)],
2294                quality: 100,
2295            },
2296        ];
2297
2298        mocked_server.assert();
2299        assert_eq!(*tokens.data(), expected);
2300    }
2301
2302    #[rstest]
2303    #[case::with_dci(Some(vec!["system2"]), vec!["system2"])]
2304    #[case::backward_compat(None, vec![])]
2305    #[tokio::test]
2306    async fn test_get_protocol_systems(
2307        #[case] dci_protocols: Option<Vec<&str>>,
2308        #[case] expected_dci: Vec<&str>,
2309    ) {
2310        use serde_json::json;
2311
2312        let mut json_value = json!({
2313            "protocol_systems": ["system1", "system2"],
2314            "pagination": { "page": 0, "page_size": 20, "total": 2 }
2315        });
2316        if let Some(dci) = dci_protocols {
2317            json_value["dci_protocols"] = json!(dci);
2318        }
2319        let server_resp = serde_json::to_string(&json_value).unwrap();
2320
2321        let mut server = Server::new_async().await;
2322        let mocked_server = server
2323            .mock("POST", "/v1/protocol_systems")
2324            .expect(1)
2325            .with_body(&server_resp)
2326            .create_async()
2327            .await;
2328        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2329            .expect("create client");
2330
2331        let response = client
2332            .get_protocol_systems(ProtocolSystemsParams::new(Chain::Ethereum))
2333            .await
2334            .expect("get protocol systems");
2335
2336        mocked_server.assert();
2337        assert_eq!(response.data().protocol_systems(), ["system1", "system2"]);
2338        assert_eq!(response.data().dci_protocols(), expected_dci.as_slice());
2339    }
2340
2341    #[tokio::test]
2342    async fn test_get_component_tvl() {
2343        let mut server = Server::new_async().await;
2344        let server_resp = r#"
2345        {
2346            "tvl": {
2347                "component1": 100.0
2348            },
2349            "pagination": {
2350                "page": 0,
2351                "page_size": 20,
2352                "total": 10
2353            }
2354        }
2355        "#;
2356        // test that the response is deserialized correctly
2357        serde_json::from_str::<ComponentTvlRequestResponse>(server_resp).expect("deserialize");
2358
2359        let mocked_server = server
2360            .mock("POST", "/v1/component_tvl")
2361            .expect(1)
2362            .with_body(server_resp)
2363            .create_async()
2364            .await;
2365        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2366            .expect("create client");
2367
2368        let component_tvl = client
2369            .get_component_tvl(ComponentTvlParams::new(Chain::Ethereum))
2370            .await
2371            .expect("get component tvl");
2372
2373        mocked_server.assert();
2374        assert_eq!(component_tvl.data().get("component1"), Some(&100.0));
2375    }
2376
2377    #[tokio::test]
2378    async fn test_get_traced_entry_points() {
2379        let mut server = Server::new_async().await;
2380        let server_resp = r#"
2381        {
2382            "traced_entry_points": {
2383                "component_1": [
2384                    [
2385                        {
2386                            "entry_point": {
2387                                "external_id": "entrypoint_a",
2388                                "target": "0x0000000000000000000000000000000000000001",
2389                                "signature": "sig()"
2390                            },
2391                            "params": {
2392                                "method": "rpctracer",
2393                                "caller": "0x000000000000000000000000000000000000000a",
2394                                "calldata": "0x000000000000000000000000000000000000000b"
2395                            }
2396                        },
2397                        {
2398                            "retriggers": [
2399                                [
2400                                    "0x00000000000000000000000000000000000000aa",
2401                                    {"key": "0x0000000000000000000000000000000000000aaa", "offset": 12}
2402                                ]
2403                            ],
2404                            "accessed_slots": {
2405                                "0x0000000000000000000000000000000000aaaa": [
2406                                    "0x0000000000000000000000000000000000aaaa"
2407                                ]
2408                            }
2409                        }
2410                    ]
2411                ]
2412            },
2413            "pagination": {
2414                "page": 0,
2415                "page_size": 20,
2416                "total": 1
2417            }
2418        }
2419        "#;
2420        // test that the response is deserialized correctly
2421        serde_json::from_str::<TracedEntryPointRequestResponse>(server_resp).expect("deserialize");
2422
2423        let mocked_server = server
2424            .mock("POST", "/v1/traced_entry_points")
2425            .expect(1)
2426            .with_body(server_resp)
2427            .create_async()
2428            .await;
2429        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2430            .expect("create client");
2431
2432        let entrypoints = client
2433            .get_traced_entry_points(TracedEntryPointsParams::new(Chain::Ethereum, ""))
2434            .await
2435            .expect("get traced entry points");
2436
2437        mocked_server.assert();
2438        assert_eq!(entrypoints.data().len(), 1);
2439        let comp1_entrypoints = entrypoints
2440            .data()
2441            .get("component_1")
2442            .expect("component_1 entrypoints should exist");
2443        assert_eq!(comp1_entrypoints.len(), 1);
2444
2445        let (entrypoint, trace_result) = &comp1_entrypoints[0];
2446        assert_eq!(entrypoint.entry_point.external_id, "entrypoint_a");
2447        assert_eq!(
2448            entrypoint.entry_point.target,
2449            Bytes::from_str("0x0000000000000000000000000000000000000001").unwrap()
2450        );
2451        assert_eq!(entrypoint.entry_point.signature, "sig()");
2452        let tycho_common::models::blockchain::TracingParams::RPCTracer(rpc_params) =
2453            &entrypoint.params;
2454        assert_eq!(
2455            rpc_params.caller,
2456            Some(Bytes::from("0x000000000000000000000000000000000000000a"))
2457        );
2458        assert_eq!(rpc_params.calldata, Bytes::from("0x000000000000000000000000000000000000000b"));
2459
2460        assert_eq!(
2461            trace_result.retriggers,
2462            HashSet::from([(
2463                Bytes::from("0x00000000000000000000000000000000000000aa"),
2464                AddressStorageLocation::new(
2465                    Bytes::from("0x0000000000000000000000000000000000000aaa"),
2466                    12
2467                )
2468            )])
2469        );
2470        assert_eq!(trace_result.accessed_slots.len(), 1);
2471        assert_eq!(
2472            trace_result.accessed_slots,
2473            HashMap::from([(
2474                Bytes::from("0x0000000000000000000000000000000000aaaa"),
2475                HashSet::from([Bytes::from("0x0000000000000000000000000000000000aaaa")])
2476            )])
2477        );
2478    }
2479
2480    #[tokio::test]
2481    async fn test_parse_retry_value_numeric() {
2482        let result = parse_retry_value("60");
2483        assert!(result.is_some());
2484
2485        let expected_time = SystemTime::now() + Duration::from_secs(60);
2486        let actual_time = result.unwrap();
2487
2488        // Allow for small timing differences during test execution
2489        let diff = if actual_time > expected_time {
2490            actual_time
2491                .duration_since(expected_time)
2492                .unwrap()
2493        } else {
2494            expected_time
2495                .duration_since(actual_time)
2496                .unwrap()
2497        };
2498        assert!(diff < Duration::from_secs(1), "Time difference too large: {:?}", diff);
2499    }
2500
2501    #[tokio::test]
2502    async fn test_parse_retry_value_rfc2822() {
2503        // Use a fixed future date in RFC2822 format
2504        let rfc2822_date = "Sat, 01 Jan 2030 12:00:00 +0000";
2505        let result = parse_retry_value(rfc2822_date);
2506        assert!(result.is_some());
2507
2508        let parsed_time = result.unwrap();
2509        assert!(parsed_time > SystemTime::now());
2510    }
2511
2512    #[tokio::test]
2513    async fn test_parse_retry_value_invalid_formats() {
2514        // Test various invalid formats
2515        assert!(parse_retry_value("invalid").is_none());
2516        assert!(parse_retry_value("").is_none());
2517        assert!(parse_retry_value("not_a_number").is_none());
2518        assert!(parse_retry_value("Mon, 32 Jan 2030 25:00:00 +0000").is_none());
2519        // Invalid date
2520    }
2521
2522    #[tokio::test]
2523    async fn test_parse_retry_value_zero_seconds() {
2524        let result = parse_retry_value("0");
2525        assert!(result.is_some());
2526
2527        let expected_time = SystemTime::now();
2528        let actual_time = result.unwrap();
2529
2530        // Should be very close to current time
2531        let diff = if actual_time > expected_time {
2532            actual_time
2533                .duration_since(expected_time)
2534                .unwrap()
2535        } else {
2536            expected_time
2537                .duration_since(actual_time)
2538                .unwrap()
2539        };
2540        assert!(diff < Duration::from_secs(1));
2541    }
2542
2543    #[tokio::test]
2544    async fn test_error_for_response_rate_limited() {
2545        let mut server = Server::new_async().await;
2546        let mock = server
2547            .mock("GET", "/test")
2548            .with_status(429)
2549            .with_header("Retry-After", "60")
2550            .create_async()
2551            .await;
2552
2553        let client = reqwest::Client::new();
2554        let response = client
2555            .get(format!("{}/test", server.url()))
2556            .send()
2557            .await
2558            .unwrap();
2559
2560        let http_client =
2561            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2562                .unwrap()
2563                .with_test_backoff_policy();
2564        let result = http_client
2565            .error_for_response(response)
2566            .await;
2567
2568        mock.assert();
2569        assert!(matches!(result, Err(RPCError::RateLimited(_))));
2570        if let Err(RPCError::RateLimited(retry_after)) = result {
2571            assert!(retry_after.is_some());
2572        }
2573    }
2574
2575    #[tokio::test]
2576    async fn test_error_for_response_rate_limited_no_header() {
2577        let mut server = Server::new_async().await;
2578        let mock = server
2579            .mock("GET", "/test")
2580            .with_status(429)
2581            .create_async()
2582            .await;
2583
2584        let client = reqwest::Client::new();
2585        let response = client
2586            .get(format!("{}/test", server.url()))
2587            .send()
2588            .await
2589            .unwrap();
2590
2591        let http_client =
2592            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2593                .unwrap()
2594                .with_test_backoff_policy();
2595        let result = http_client
2596            .error_for_response(response)
2597            .await;
2598
2599        mock.assert();
2600        assert!(matches!(result, Err(RPCError::RateLimited(None))));
2601    }
2602
2603    #[tokio::test]
2604    async fn test_error_for_response_server_errors() {
2605        let test_cases =
2606            vec![(502, "Bad Gateway"), (503, "Service Unavailable"), (504, "Gateway Timeout")];
2607
2608        for (status_code, expected_body) in test_cases {
2609            let mut server = Server::new_async().await;
2610            let mock = server
2611                .mock("GET", "/test")
2612                .with_status(status_code)
2613                .with_body(expected_body)
2614                .create_async()
2615                .await;
2616
2617            let client = reqwest::Client::new();
2618            let response = client
2619                .get(format!("{}/test", server.url()))
2620                .send()
2621                .await
2622                .unwrap();
2623
2624            let http_client =
2625                HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2626                    .unwrap()
2627                    .with_test_backoff_policy();
2628            let result = http_client
2629                .error_for_response(response)
2630                .await;
2631
2632            mock.assert();
2633            assert!(matches!(result, Err(RPCError::ServerUnreachable(_))));
2634            if let Err(RPCError::ServerUnreachable(body)) = result {
2635                assert_eq!(body, expected_body);
2636            }
2637        }
2638    }
2639
2640    #[tokio::test]
2641    async fn test_error_for_response_success() {
2642        let mut server = Server::new_async().await;
2643        let mock = server
2644            .mock("GET", "/test")
2645            .with_status(200)
2646            .with_body("success")
2647            .create_async()
2648            .await;
2649
2650        let client = reqwest::Client::new();
2651        let response = client
2652            .get(format!("{}/test", server.url()))
2653            .send()
2654            .await
2655            .unwrap();
2656
2657        let http_client =
2658            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2659                .unwrap()
2660                .with_test_backoff_policy();
2661        let result = http_client
2662            .error_for_response(response)
2663            .await;
2664
2665        mock.assert();
2666        assert!(result.is_ok());
2667
2668        let response = result.unwrap();
2669        assert_eq!(response.status(), 200);
2670    }
2671
2672    #[tokio::test]
2673    async fn test_handle_error_for_backoff_server_unreachable() {
2674        let http_client =
2675            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2676                .unwrap()
2677                .with_test_backoff_policy();
2678        let error = RPCError::ServerUnreachable("Service down".to_string());
2679
2680        let backoff_error = http_client
2681            .handle_error_for_backoff(error)
2682            .await;
2683
2684        match backoff_error {
2685            backoff::Error::Transient { err: RPCError::ServerUnreachable(msg), retry_after } => {
2686                assert_eq!(msg, "Service down");
2687                assert_eq!(retry_after, Some(Duration::from_millis(50))); // Fast test duration
2688            }
2689            _ => panic!("Expected transient error for ServerUnreachable"),
2690        }
2691    }
2692
2693    #[tokio::test]
2694    async fn test_handle_error_for_backoff_rate_limited_with_retry_after() {
2695        let http_client =
2696            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2697                .unwrap()
2698                .with_test_backoff_policy();
2699        let future_time = SystemTime::now() + Duration::from_secs(30);
2700        let error = RPCError::RateLimited(Some(future_time));
2701
2702        let backoff_error = http_client
2703            .handle_error_for_backoff(error)
2704            .await;
2705
2706        match backoff_error {
2707            backoff::Error::Transient { err: RPCError::RateLimited(retry_after), .. } => {
2708                assert_eq!(retry_after, Some(future_time));
2709            }
2710            _ => panic!("Expected transient error for RateLimited"),
2711        }
2712
2713        // Verify that retry_after was stored in the client state
2714        let stored_retry_after = http_client.retry_after.read().await;
2715        assert_eq!(*stored_retry_after, Some(future_time));
2716    }
2717
2718    #[tokio::test]
2719    async fn test_handle_error_for_backoff_rate_limited_no_retry_after() {
2720        let http_client =
2721            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2722                .unwrap()
2723                .with_test_backoff_policy();
2724        let error = RPCError::RateLimited(None);
2725
2726        let backoff_error = http_client
2727            .handle_error_for_backoff(error)
2728            .await;
2729
2730        match backoff_error {
2731            backoff::Error::Transient { err: RPCError::RateLimited(None), .. } => {
2732                // This is expected - no retry-after still allows retries with default policy
2733            }
2734            _ => panic!("Expected transient error for RateLimited without retry-after"),
2735        }
2736    }
2737
2738    #[tokio::test]
2739    async fn test_handle_error_for_backoff_other_errors() {
2740        let http_client =
2741            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2742                .unwrap()
2743                .with_test_backoff_policy();
2744        let error = RPCError::ParseResponse("Invalid JSON".to_string());
2745
2746        let backoff_error = http_client
2747            .handle_error_for_backoff(error)
2748            .await;
2749
2750        match backoff_error {
2751            backoff::Error::Permanent(RPCError::ParseResponse(msg)) => {
2752                assert_eq!(msg, "Invalid JSON");
2753            }
2754            _ => panic!("Expected permanent error for ParseResponse"),
2755        }
2756    }
2757
2758    #[tokio::test]
2759    async fn test_wait_until_retry_after_no_retry_time() {
2760        let http_client =
2761            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2762                .unwrap()
2763                .with_test_backoff_policy();
2764
2765        let start = std::time::Instant::now();
2766        http_client
2767            .wait_until_retry_after()
2768            .await;
2769        let elapsed = start.elapsed();
2770
2771        // Should return immediately if no retry time is set
2772        assert!(elapsed < Duration::from_millis(100));
2773    }
2774
2775    #[tokio::test]
2776    async fn test_wait_until_retry_after_past_time() {
2777        let http_client =
2778            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2779                .unwrap()
2780                .with_test_backoff_policy();
2781
2782        // Set a retry time in the past
2783        let past_time = SystemTime::now() - Duration::from_secs(10);
2784        *http_client.retry_after.write().await = Some(past_time);
2785
2786        let start = std::time::Instant::now();
2787        http_client
2788            .wait_until_retry_after()
2789            .await;
2790        let elapsed = start.elapsed();
2791
2792        // Should return immediately if retry time is in the past
2793        assert!(elapsed < Duration::from_millis(100));
2794    }
2795
2796    #[tokio::test]
2797    async fn test_wait_until_retry_after_future_time() {
2798        let http_client =
2799            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2800                .unwrap()
2801                .with_test_backoff_policy();
2802
2803        // Set a retry time 100ms in the future
2804        let future_time = SystemTime::now() + Duration::from_millis(100);
2805        *http_client.retry_after.write().await = Some(future_time);
2806
2807        let start = std::time::Instant::now();
2808        http_client
2809            .wait_until_retry_after()
2810            .await;
2811        let elapsed = start.elapsed();
2812
2813        // Should wait approximately the specified duration
2814        assert!(elapsed >= Duration::from_millis(80)); // Allow some tolerance
2815        assert!(elapsed <= Duration::from_millis(200)); // Upper bound for test stability
2816    }
2817
2818    #[tokio::test]
2819    async fn test_make_post_request_success() {
2820        let mut server = Server::new_async().await;
2821        let server_resp = r#"{"success": true}"#;
2822
2823        let mock = server
2824            .mock("POST", "/test")
2825            .with_status(200)
2826            .with_body(server_resp)
2827            .create_async()
2828            .await;
2829
2830        let http_client =
2831            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2832                .unwrap()
2833                .with_test_backoff_policy();
2834        let request_body = serde_json::json!({"test": "data"});
2835        let uri = format!("{}/test", server.url());
2836
2837        let result = http_client
2838            .make_post_request(&request_body, &uri)
2839            .await;
2840
2841        mock.assert();
2842        assert!(result.is_ok());
2843
2844        let response = result.unwrap();
2845        assert_eq!(response.status(), 200);
2846        assert_eq!(response.text().await.unwrap(), server_resp);
2847    }
2848
2849    #[tokio::test]
2850    async fn test_make_post_request_retry_on_server_error() {
2851        let mut server = Server::new_async().await;
2852        // First request fails with 503, second succeeds
2853        let error_mock = server
2854            .mock("POST", "/test")
2855            .with_status(503)
2856            .with_body("Service Unavailable")
2857            .expect(1)
2858            .create_async()
2859            .await;
2860
2861        let success_mock = server
2862            .mock("POST", "/test")
2863            .with_status(200)
2864            .with_body(r#"{"success": true}"#)
2865            .expect(1)
2866            .create_async()
2867            .await;
2868
2869        let http_client =
2870            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2871                .unwrap()
2872                .with_test_backoff_policy();
2873        let request_body = serde_json::json!({"test": "data"});
2874        let uri = format!("{}/test", server.url());
2875
2876        let result = http_client
2877            .make_post_request(&request_body, &uri)
2878            .await;
2879
2880        error_mock.assert();
2881        success_mock.assert();
2882        assert!(result.is_ok());
2883    }
2884
2885    #[tokio::test]
2886    async fn test_make_post_request_respect_retry_after_header() {
2887        let mut server = Server::new_async().await;
2888
2889        // First request returns 429 with retry-after, second succeeds
2890        let rate_limit_mock = server
2891            .mock("POST", "/test")
2892            .with_status(429)
2893            .with_header("Retry-After", "1") // 1 second
2894            .expect(1)
2895            .create_async()
2896            .await;
2897
2898        let success_mock = server
2899            .mock("POST", "/test")
2900            .with_status(200)
2901            .with_body(r#"{"success": true}"#)
2902            .expect(1)
2903            .create_async()
2904            .await;
2905
2906        let http_client =
2907            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2908                .unwrap()
2909                .with_test_backoff_policy();
2910        let request_body = serde_json::json!({"test": "data"});
2911        let uri = format!("{}/test", server.url());
2912
2913        let start = std::time::Instant::now();
2914        let result = http_client
2915            .make_post_request(&request_body, &uri)
2916            .await;
2917        let elapsed = start.elapsed();
2918
2919        rate_limit_mock.assert();
2920        success_mock.assert();
2921        assert!(result.is_ok());
2922
2923        // Should have waited at least 1 second due to retry-after header
2924        assert!(elapsed >= Duration::from_millis(900)); // Allow some tolerance
2925        assert!(elapsed <= Duration::from_millis(2000)); // Upper bound for test stability
2926    }
2927
2928    #[tokio::test]
2929    async fn test_make_post_request_permanent_error() {
2930        let mut server = Server::new_async().await;
2931
2932        let mock = server
2933            .mock("POST", "/test")
2934            .with_status(400) // Bad Request - should not be retried
2935            .with_body("Bad Request")
2936            .expect(1)
2937            .create_async()
2938            .await;
2939
2940        let http_client =
2941            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2942                .unwrap()
2943                .with_test_backoff_policy();
2944        let request_body = serde_json::json!({"test": "data"});
2945        let uri = format!("{}/test", server.url());
2946
2947        let result = http_client
2948            .make_post_request(&request_body, &uri)
2949            .await;
2950
2951        mock.assert();
2952        assert!(result.is_ok()); // 400 doesn't trigger retry logic, just returns the response
2953
2954        let response = result.unwrap();
2955        assert_eq!(response.status(), 400);
2956    }
2957
2958    #[tokio::test]
2959    async fn test_concurrent_requests_with_different_retry_after() {
2960        let mut server = Server::new_async().await;
2961
2962        // First request gets rate limited with 1 second retry-after
2963        let rate_limit_mock_1 = server
2964            .mock("POST", "/test1")
2965            .with_status(429)
2966            .with_header("Retry-After", "1")
2967            .expect(1)
2968            .create_async()
2969            .await;
2970
2971        // Second request gets rate limited with 2 second retry-after
2972        let rate_limit_mock_2 = server
2973            .mock("POST", "/test2")
2974            .with_status(429)
2975            .with_header("Retry-After", "2")
2976            .expect(1)
2977            .create_async()
2978            .await;
2979
2980        // Success mocks for retries
2981        let success_mock_1 = server
2982            .mock("POST", "/test1")
2983            .with_status(200)
2984            .with_body(r#"{"result": "success1"}"#)
2985            .expect(1)
2986            .create_async()
2987            .await;
2988
2989        let success_mock_2 = server
2990            .mock("POST", "/test2")
2991            .with_status(200)
2992            .with_body(r#"{"result": "success2"}"#)
2993            .expect(1)
2994            .create_async()
2995            .await;
2996
2997        let http_client =
2998            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2999                .unwrap()
3000                .with_test_backoff_policy();
3001        let request_body = serde_json::json!({"test": "data"});
3002
3003        let uri1 = format!("{}/test1", server.url());
3004        let uri2 = format!("{}/test2", server.url());
3005
3006        // Start both requests concurrently
3007        let start = std::time::Instant::now();
3008        let (result1, result2) = tokio::join!(
3009            http_client.make_post_request(&request_body, &uri1),
3010            http_client.make_post_request(&request_body, &uri2)
3011        );
3012        let elapsed = start.elapsed();
3013
3014        rate_limit_mock_1.assert();
3015        rate_limit_mock_2.assert();
3016        success_mock_1.assert();
3017        success_mock_2.assert();
3018
3019        assert!(result1.is_ok());
3020        assert!(result2.is_ok());
3021
3022        // Both requests should succeed, but the second should take longer due to the 2s retry-after
3023        // The total time should be at least 2 seconds since the shared retry_after state
3024        // gets updated by both requests
3025        assert!(elapsed >= Duration::from_millis(1800)); // Allow some tolerance
3026        assert!(elapsed <= Duration::from_millis(3000)); // Upper bound
3027
3028        // Check the final retry_after state - should be the latest (higher) value
3029        let final_retry_after = http_client.retry_after.read().await;
3030        assert!(final_retry_after.is_some());
3031
3032        // The retry_after should be set to the latest (higher) value from the two requests
3033        if let Some(retry_time) = *final_retry_after {
3034            // The retry_after time might be in the past now since we waited,
3035            // but it should be reasonable (not too far in past/future)
3036            let now = SystemTime::now();
3037            let diff = if retry_time > now {
3038                retry_time.duration_since(now).unwrap()
3039            } else {
3040                now.duration_since(retry_time).unwrap()
3041            };
3042
3043            // Should be within a reasonable range (the 2s retry-after plus some buffer)
3044            assert!(diff <= Duration::from_secs(3), "Retry time difference too large: {:?}", diff);
3045        }
3046    }
3047
3048    #[tokio::test]
3049    async fn test_get_snapshots() {
3050        let mut server = Server::new_async().await;
3051
3052        // Mock protocol states response
3053        let protocol_states_resp = r#"
3054        {
3055            "states": [
3056                {
3057                    "component_id": "component1",
3058                    "attributes": {
3059                        "attribute_1": "0x00000000000003e8"
3060                    },
3061                    "balances": {
3062                        "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
3063                    }
3064                }
3065            ],
3066            "pagination": {
3067                "page": 0,
3068                "page_size": 100,
3069                "total": 1
3070            }
3071        }
3072        "#;
3073
3074        // Mock contract state response
3075        let contract_state_resp = r#"
3076        {
3077            "accounts": [
3078                {
3079                    "chain": "ethereum",
3080                    "address": "0x1111111111111111111111111111111111111111",
3081                    "title": "",
3082                    "slots": {},
3083                    "native_balance": "0x01f4",
3084                    "token_balances": {},
3085                    "code": "0x00",
3086                    "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
3087                    "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
3088                    "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
3089                    "creation_tx": null
3090                }
3091            ],
3092            "pagination": {
3093                "page": 0,
3094                "page_size": 100,
3095                "total": 1
3096            }
3097        }
3098        "#;
3099
3100        // Mock component TVL response
3101        let tvl_resp = r#"
3102        {
3103            "tvl": {
3104                "component1": 1000000.0
3105            },
3106            "pagination": {
3107                "page": 0,
3108                "page_size": 100,
3109                "total": 1
3110            }
3111        }
3112        "#;
3113
3114        let protocol_states_mock = server
3115            .mock("POST", "/v1/protocol_state")
3116            .expect(1)
3117            .with_body(protocol_states_resp)
3118            .create_async()
3119            .await;
3120
3121        let contract_state_mock = server
3122            .mock("POST", "/v1/contract_state")
3123            .expect(1)
3124            .with_body(contract_state_resp)
3125            .create_async()
3126            .await;
3127
3128        let tvl_mock = server
3129            .mock("POST", "/v1/component_tvl")
3130            .expect(1)
3131            .with_body(tvl_resp)
3132            .create_async()
3133            .await;
3134
3135        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3136            .expect("create client");
3137
3138        let component = tycho_common::models::protocol::ProtocolComponent {
3139            id: "component1".to_string(),
3140            protocol_system: "test_protocol".to_string(),
3141            protocol_type_name: "test_type".to_string(),
3142            chain: Chain::Ethereum,
3143            tokens: vec![],
3144            contract_addresses: vec![
3145                Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()
3146            ],
3147            static_attributes: HashMap::new(),
3148            change: tycho_common::models::ChangeType::Creation,
3149            creation_tx: Bytes::from_str(
3150                "0x0000000000000000000000000000000000000000000000000000000000000000",
3151            )
3152            .unwrap(),
3153            created_at: chrono::Utc::now().naive_utc(),
3154        };
3155
3156        let mut components = HashMap::new();
3157        components.insert("component1".to_string(), component);
3158
3159        let contract_ids =
3160            vec![Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()];
3161
3162        let request = SnapshotParameters::new(
3163            Chain::Ethereum,
3164            "test_protocol",
3165            &components,
3166            &contract_ids,
3167            12345,
3168        );
3169
3170        let response = client
3171            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3172            .await
3173            .expect("get snapshots");
3174
3175        // Verify all mocks were called
3176        protocol_states_mock.assert();
3177        contract_state_mock.assert();
3178        tvl_mock.assert();
3179
3180        // Assert states
3181        assert_eq!(response.states.len(), 1);
3182        assert!(response
3183            .states
3184            .contains_key("component1"));
3185
3186        // Check that the state has the expected TVL
3187        let component_state = response
3188            .states
3189            .get("component1")
3190            .unwrap();
3191        assert_eq!(component_state.component_tvl, Some(1000000.0));
3192
3193        // Assert VM storage
3194        assert_eq!(response.vm_storage.len(), 1);
3195        let contract_addr = Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap();
3196        assert!(response
3197            .vm_storage
3198            .contains_key(&contract_addr));
3199    }
3200
3201    #[tokio::test]
3202    async fn test_get_snapshots_empty_components() {
3203        let server = Server::new_async().await;
3204        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3205            .expect("create client");
3206
3207        let components = HashMap::new();
3208        let contract_ids = vec![];
3209
3210        let request = SnapshotParameters::new(
3211            Chain::Ethereum,
3212            "test_protocol",
3213            &components,
3214            &contract_ids,
3215            12345,
3216        );
3217
3218        let response = client
3219            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3220            .await
3221            .expect("get snapshots");
3222
3223        // Should return empty response without making any requests
3224        assert!(response.states.is_empty());
3225        assert!(response.vm_storage.is_empty());
3226    }
3227
3228    #[tokio::test]
3229    async fn test_get_snapshots_without_tvl() {
3230        let mut server = Server::new_async().await;
3231
3232        let protocol_states_resp = r#"
3233        {
3234            "states": [
3235                {
3236                    "component_id": "component1",
3237                    "attributes": {},
3238                    "balances": {}
3239                }
3240            ],
3241            "pagination": {
3242                "page": 0,
3243                "page_size": 100,
3244                "total": 1
3245            }
3246        }
3247        "#;
3248
3249        let protocol_states_mock = server
3250            .mock("POST", "/v1/protocol_state")
3251            .expect(1)
3252            .with_body(protocol_states_resp)
3253            .create_async()
3254            .await;
3255
3256        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3257            .expect("create client");
3258
3259        // Create test component
3260        let component = tycho_common::models::protocol::ProtocolComponent {
3261            id: "component1".to_string(),
3262            protocol_system: "test_protocol".to_string(),
3263            protocol_type_name: "test_type".to_string(),
3264            chain: Chain::Ethereum,
3265            tokens: vec![],
3266            contract_addresses: vec![],
3267            static_attributes: HashMap::new(),
3268            change: tycho_common::models::ChangeType::Creation,
3269            creation_tx: Bytes::from_str(
3270                "0x0000000000000000000000000000000000000000000000000000000000000000",
3271            )
3272            .unwrap(),
3273            created_at: chrono::Utc::now().naive_utc(),
3274        };
3275
3276        let mut components = HashMap::new();
3277        components.insert("component1".to_string(), component);
3278        let contract_ids = vec![];
3279
3280        let request = SnapshotParameters::new(
3281            Chain::Ethereum,
3282            "test_protocol",
3283            &components,
3284            &contract_ids,
3285            12345,
3286        )
3287        .include_balances(false)
3288        .include_tvl(false);
3289
3290        let response = client
3291            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3292            .await
3293            .expect("get snapshots");
3294
3295        // Verify only necessary mocks were called
3296        protocol_states_mock.assert();
3297        // No contract_state_mock.assert() since contract_ids is empty
3298        // No tvl_mock.assert() since include_tvl is false
3299
3300        assert_eq!(response.states.len(), 1);
3301        // Check that TVL is None since we didn't request it
3302        let component_state = response
3303            .states
3304            .get("component1")
3305            .unwrap();
3306        assert_eq!(component_state.component_tvl, None);
3307    }
3308
3309    #[tokio::test]
3310    async fn test_compression_enabled() {
3311        let mut server = Server::new_async().await;
3312        let server_resp = GET_CONTRACT_STATE_RESP;
3313
3314        // Compress the response using zstd
3315        let compressed_body =
3316            zstd::encode_all(server_resp.as_bytes(), 0).expect("compression failed");
3317
3318        let mocked_server = server
3319            .mock("POST", "/v1/contract_state")
3320            .expect(1)
3321            .with_header("Content-Encoding", "zstd")
3322            .with_body(compressed_body)
3323            .create_async()
3324            .await;
3325
3326        // Create client with compression enabled
3327        let client = HttpRPCClient::new(
3328            server.url().as_str(),
3329            HttpRPCClientOptions::new().with_compression(true),
3330        )
3331        .expect("create client");
3332
3333        let response = client
3334            .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
3335            .await
3336            .expect("get state");
3337        let accounts = response;
3338
3339        mocked_server.assert();
3340        assert_eq!(accounts.data().len(), 1);
3341        assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
3342    }
3343
3344    #[tokio::test]
3345    async fn test_compression_disabled() {
3346        let mut server = Server::new_async().await;
3347        let server_resp = GET_CONTRACT_STATE_RESP;
3348
3349        // Verify client does NOT send Accept-Encoding: zstd when compression is disabled
3350        // Instead, server should receive request without compression headers
3351        let mocked_server = server
3352            .mock("POST", "/v1/contract_state")
3353            .expect(1)
3354            .match_header("Accept-Encoding", mockito::Matcher::Missing)
3355            .with_status(200)
3356            .with_body(server_resp)
3357            .create_async()
3358            .await;
3359
3360        // Create client with compression disabled
3361        let client = HttpRPCClient::new(
3362            server.url().as_str(),
3363            HttpRPCClientOptions::new().with_compression(false),
3364        )
3365        .expect("create client");
3366
3367        let response = client
3368            .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
3369            .await
3370            .expect("get state");
3371        let accounts = response;
3372
3373        // Verify the mock was called (client sent request without Accept-Encoding header)
3374        mocked_server.assert();
3375        assert_eq!(accounts.data().len(), 1);
3376        assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
3377    }
3378
3379    #[rstest]
3380    #[case::single_page(2, 1000)]
3381    #[case::multiple_pages_within_concurrency(10, 2)]
3382    #[case::exceeds_concurrency_limit(60, 2)]
3383    #[tokio::test]
3384    async fn test_get_all_tokens_pagination_and_concurrency(
3385        #[case] total_tokens: usize,
3386        #[case] page_size: usize,
3387    ) {
3388        use std::sync::atomic::{AtomicUsize, Ordering};
3389
3390        let allowed_concurrency = 10;
3391
3392        let concurrent_requests = Arc::new(AtomicUsize::new(0));
3393        let max_concurrent = Arc::new(AtomicUsize::new(0));
3394
3395        let mut server = Server::new_async().await;
3396
3397        let total_pages = (total_tokens as f64 / page_size as f64).ceil() as i64;
3398
3399        // Mock all required pages
3400        for page in 0..total_pages {
3401            let concurrent = concurrent_requests.clone();
3402            let max_conc = max_concurrent.clone();
3403
3404            let tokens_in_page = {
3405                let start_idx = (page as usize) * page_size;
3406                let end_idx = ((page as usize + 1) * page_size).min(total_tokens);
3407                (start_idx..end_idx)
3408                    .map(|i| {
3409                        format!(
3410                            r#"{{
3411                            "chain": "ethereum",
3412                            "address": "0x{i:040x}",
3413                            "symbol": "TOKEN_{i}",
3414                            "decimals": 18,
3415                            "tax": 0,
3416                            "gas": [30000],
3417                            "quality": 100
3418                        }}"#
3419                        )
3420                    })
3421                    .collect::<Vec<_>>()
3422            };
3423
3424            let tokens_json = tokens_in_page.join(",");
3425            let response = format!(
3426                r#"{{
3427                    "tokens": [{tokens_json}],
3428                    "pagination": {{
3429                        "page": {page},
3430                        "page_size": {page_size},
3431                        "total": {total_tokens}
3432                    }}
3433                }}"#,
3434            );
3435
3436            server
3437                .mock("POST", "/v1/tokens")
3438                .expect(1)
3439                .with_chunked_body(move |w| {
3440                    // Track concurrent requests
3441                    let current = concurrent.fetch_add(1, Ordering::SeqCst);
3442                    max_conc.fetch_max(current + 1, Ordering::SeqCst);
3443
3444                    // Simulate some work to increase likelihood of concurrent requests
3445                    std::thread::sleep(Duration::from_millis(10));
3446
3447                    concurrent.fetch_sub(1, Ordering::SeqCst);
3448
3449                    w.write_all(response.as_bytes())
3450                })
3451                .create_async()
3452                .await;
3453        }
3454
3455        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3456            .expect("create client");
3457
3458        let tokens = client
3459            .get_all_tokens(
3460                AllTokensParams::new(Chain::Ethereum, allowed_concurrency)
3461                    .with_chunk_size(page_size),
3462            )
3463            .await
3464            .expect("get all tokens");
3465
3466        // Verify concurrency was respected
3467        let max = max_concurrent.load(Ordering::SeqCst);
3468        let expected_max_concurrency = (total_pages as usize)
3469            .saturating_sub(1)
3470            .min(allowed_concurrency);
3471        assert!(
3472            max <= allowed_concurrency,
3473            "Expected max concurrent requests <= {allowed_concurrency}, got {max}"
3474        );
3475
3476        // For cases with multiple pages, verify we actually used concurrency
3477        if total_pages > 1 && expected_max_concurrency > 1 {
3478            assert!(
3479                max > 0,
3480                "Expected some concurrent requests for multi-page response, got {max}"
3481            );
3482        }
3483
3484        // Verify we got all expected tokens
3485        assert_eq!(
3486            tokens.len(),
3487            total_tokens,
3488            "Expected {total_tokens} tokens, got {}",
3489            tokens.len()
3490        );
3491
3492        // Verify tokens are in the expected order
3493        for (i, token) in tokens.iter().enumerate() {
3494            assert_eq!(token.symbol, format!("TOKEN_{i}"), "Token at index {i} has wrong symbol");
3495        }
3496    }
3497}