Skip to main content

tycho_client/
rpc.rs

1//! # Tycho RPC Client
2//!
3//! The objective of this module is to provide swift and simplified access to the Remote Procedure
4//! Call (RPC) endpoints of Tycho. These endpoints are chiefly responsible for facilitating data
5//! queries, especially querying snapshots of data.
6use std::{
7    collections::HashMap,
8    sync::Arc,
9    time::{Duration, SystemTime},
10};
11
12use async_trait::async_trait;
13use backoff::{exponential::ExponentialBackoffBuilder, ExponentialBackoff};
14use futures03::future::try_join_all;
15#[cfg(test)]
16use mockall::automock;
17use reqwest::{header, Client, ClientBuilder, Response, StatusCode, Url};
18use serde::Serialize;
19use thiserror::Error;
20use time::{format_description::well_known::Rfc2822, OffsetDateTime};
21use tokio::{
22    sync::{RwLock, Semaphore},
23    time::sleep,
24};
25use tracing::{debug, error, instrument, trace, warn};
26use tycho_common::{
27    dto::{
28        ComponentTvlRequestBody, ComponentTvlRequestResponse, PaginationLimits, PaginationParams,
29        ProtocolComponentRequestResponse, ProtocolComponentsRequestBody, ProtocolStateRequestBody,
30        ProtocolStateRequestResponse, ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse,
31        StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
32        TracedEntryPointRequestBody, TracedEntryPointRequestResponse, VersionParam,
33    },
34    models::{
35        blockchain::{EntryPointWithTracingParams, TracedEntryPoints, TracingResult},
36        contract::Account,
37        protocol::{ProtocolComponent, ProtocolComponentState},
38        token::Token,
39        Chain, ComponentId,
40    },
41    Bytes,
42};
43
44/// Data payload returned by `RPCClient::get_protocol_systems`.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct ProtocolSystems {
47    protocol_systems: Vec<String>,
48    dci_protocols: Vec<String>,
49}
50
51impl ProtocolSystems {
52    pub(crate) fn new(protocol_systems: Vec<String>, dci_protocols: Vec<String>) -> Self {
53        Self { protocol_systems, dci_protocols }
54    }
55
56    pub fn protocol_systems(&self) -> &[String] {
57        &self.protocol_systems
58    }
59
60    pub fn dci_protocols(&self) -> &[String] {
61        &self.dci_protocols
62    }
63}
64
65/// An RPC response page, bundling data with the pagination metadata the server returned.
66#[derive(Debug, Clone, PartialEq)]
67pub struct Page<T> {
68    data: T,
69    total: i64,
70    page: i64,
71    page_size: i64,
72}
73
74impl<T> Page<T> {
75    pub fn new(data: T, total: i64, page: i64, page_size: i64) -> Self {
76        Page { data, total, page, page_size }
77    }
78
79    pub fn data(&self) -> &T {
80        &self.data
81    }
82
83    pub fn into_data(self) -> T {
84        self.data
85    }
86
87    pub fn total(&self) -> i64 {
88        self.total
89    }
90
91    pub fn page(&self) -> i64 {
92        self.page
93    }
94
95    pub fn page_size(&self) -> i64 {
96        self.page_size
97    }
98}
99
100impl<T> Page<Vec<T>> {
101    pub fn len(&self) -> usize {
102        self.data.len()
103    }
104
105    pub fn is_empty(&self) -> bool {
106        self.data.is_empty()
107    }
108}
109
110impl<K, V, S: std::hash::BuildHasher> Page<HashMap<K, V, S>> {
111    pub fn len(&self) -> usize {
112        self.data.len()
113    }
114
115    pub fn is_empty(&self) -> bool {
116        self.data.is_empty()
117    }
118}
119
120impl<T: IntoIterator> IntoIterator for Page<T> {
121    type Item = T::Item;
122    type IntoIter = T::IntoIter;
123
124    fn into_iter(self) -> Self::IntoIter {
125        self.data.into_iter()
126    }
127}
128
129impl<'a, T> IntoIterator for &'a Page<T>
130where
131    &'a T: IntoIterator,
132{
133    type Item = <&'a T as IntoIterator>::Item;
134    type IntoIter = <&'a T as IntoIterator>::IntoIter;
135
136    fn into_iter(self) -> Self::IntoIter {
137        (&self.data).into_iter()
138    }
139}
140
141use crate::{
142    feed::synchronizer::{ComponentWithState, Snapshot},
143    TYCHO_SERVER_VERSION,
144};
145
146/// Suggested concurrency level for RPC clients.
147pub const RPC_CLIENT_CONCURRENCY: usize = 4;
148
149/// Parameters for [`RPCClient::get_contract_state`].
150#[derive(Clone, PartialEq, Debug)]
151pub struct ContractStateParams {
152    chain: Chain,
153    protocol_system: String,
154    contract_ids: Option<Vec<Bytes>>,
155    version: VersionParam,
156    page: i64,
157    page_size: i64,
158}
159
160impl ContractStateParams {
161    pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
162        Self {
163            chain,
164            protocol_system: protocol_system.into(),
165            contract_ids: None,
166            version: VersionParam::default(),
167            page: 0,
168            page_size: StateRequestBody::MAX_PAGE_SIZE_COMPRESSED,
169        }
170    }
171
172    pub fn with_contract_ids(mut self, ids: Vec<Bytes>) -> Self {
173        self.contract_ids = Some(ids);
174        self
175    }
176
177    pub fn with_version(mut self, version: VersionParam) -> Self {
178        self.version = version;
179        self
180    }
181
182    pub fn with_block_number(mut self, block_number: u64) -> Self {
183        self.version = VersionParam::at_block(self.chain.into(), block_number);
184        self
185    }
186
187    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
188        self.page = page;
189        self.page_size = page_size;
190        self
191    }
192}
193
194/// Parameters for [`RPCClient::get_protocol_components`].
195#[derive(Clone, PartialEq, Debug)]
196pub struct ProtocolComponentsParams {
197    chain: Chain,
198    protocol_system: String,
199    component_ids: Option<Vec<ComponentId>>,
200    tvl_gt: Option<f64>,
201    page: i64,
202    page_size: i64,
203}
204
205impl ProtocolComponentsParams {
206    pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
207        Self {
208            chain,
209            protocol_system: protocol_system.into(),
210            component_ids: None,
211            tvl_gt: None,
212            page: 0,
213            page_size: ProtocolComponentsRequestBody::MAX_PAGE_SIZE_COMPRESSED,
214        }
215    }
216
217    pub fn with_component_ids(mut self, ids: Vec<ComponentId>) -> Self {
218        self.component_ids = Some(ids);
219        self
220    }
221
222    pub fn with_tvl_gt(mut self, tvl_gt: f64) -> Self {
223        self.tvl_gt = Some(tvl_gt);
224        self
225    }
226
227    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
228        self.page = page;
229        self.page_size = page_size;
230        self
231    }
232
233    #[cfg(test)]
234    pub(crate) fn component_ids(&self) -> Option<&Vec<ComponentId>> {
235        self.component_ids.as_ref()
236    }
237}
238
239/// Parameters for [`RPCClient::get_protocol_states`].
240#[derive(Clone, PartialEq, Debug)]
241pub struct ProtocolStatesParams {
242    chain: Chain,
243    protocol_system: String,
244    protocol_ids: Option<Vec<String>>,
245    include_balances: bool,
246    version: VersionParam,
247    page: i64,
248    page_size: i64,
249}
250
251impl ProtocolStatesParams {
252    pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
253        Self {
254            chain,
255            protocol_system: protocol_system.into(),
256            protocol_ids: None,
257            include_balances: false,
258            version: VersionParam::default(),
259            page: 0,
260            page_size: ProtocolStateRequestBody::MAX_PAGE_SIZE_COMPRESSED,
261        }
262    }
263
264    pub fn with_protocol_ids(mut self, ids: Vec<String>) -> Self {
265        self.protocol_ids = Some(ids);
266        self
267    }
268
269    pub fn with_include_balances(mut self, include_balances: bool) -> Self {
270        self.include_balances = include_balances;
271        self
272    }
273
274    pub fn with_version(mut self, version: VersionParam) -> Self {
275        self.version = version;
276        self
277    }
278
279    pub fn with_block_number(mut self, block_number: u64) -> Self {
280        self.version = VersionParam::at_block(self.chain.into(), block_number);
281        self
282    }
283
284    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
285        self.page = page;
286        self.page_size = page_size;
287        self
288    }
289}
290
291/// Parameters for [`RPCClient::get_tokens`].
292#[derive(Clone, PartialEq, Debug)]
293pub struct TokensParams {
294    chain: Chain,
295    min_quality: Option<i32>,
296    traded_n_days_ago: Option<u64>,
297    page: i64,
298    page_size: i64,
299}
300
301impl TokensParams {
302    pub fn new(chain: Chain) -> Self {
303        Self {
304            chain,
305            min_quality: None,
306            traded_n_days_ago: None,
307            page: 0,
308            page_size: TokensRequestBody::MAX_PAGE_SIZE_COMPRESSED,
309        }
310    }
311
312    pub fn with_min_quality(mut self, min_quality: i32) -> Self {
313        self.min_quality = Some(min_quality);
314        self
315    }
316
317    pub fn with_traded_n_days_ago(mut self, days: u64) -> Self {
318        self.traded_n_days_ago = Some(days);
319        self
320    }
321
322    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
323        self.page = page;
324        self.page_size = page_size;
325        self
326    }
327}
328
329/// Parameters for [`RPCClient::get_protocol_systems`].
330#[derive(Clone, PartialEq, Debug)]
331pub struct ProtocolSystemsParams {
332    chain: Chain,
333    page: i64,
334    page_size: i64,
335}
336
337impl ProtocolSystemsParams {
338    pub fn new(chain: Chain) -> Self {
339        Self { chain, page: 0, page_size: ProtocolSystemsRequestBody::MAX_PAGE_SIZE_COMPRESSED }
340    }
341
342    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
343        self.page = page;
344        self.page_size = page_size;
345        self
346    }
347}
348
349/// Parameters for [`RPCClient::get_component_tvl`].
350#[derive(Clone, PartialEq, Debug)]
351pub struct ComponentTvlParams {
352    chain: Chain,
353    protocol_system: Option<String>,
354    component_ids: Option<Vec<String>>,
355    page: i64,
356    page_size: i64,
357}
358
359impl ComponentTvlParams {
360    pub fn new(chain: Chain) -> Self {
361        Self {
362            chain,
363            protocol_system: None,
364            component_ids: None,
365            page: 0,
366            page_size: ComponentTvlRequestBody::MAX_PAGE_SIZE_COMPRESSED,
367        }
368    }
369
370    pub fn with_protocol_system(mut self, protocol_system: impl Into<String>) -> Self {
371        self.protocol_system = Some(protocol_system.into());
372        self
373    }
374
375    pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
376        self.component_ids = Some(ids);
377        self
378    }
379
380    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
381        self.page = page;
382        self.page_size = page_size;
383        self
384    }
385}
386
387/// Parameters for [`RPCClient::get_traced_entry_points`].
388#[derive(Clone, PartialEq, Debug)]
389pub struct TracedEntryPointsParams {
390    chain: Chain,
391    protocol_system: String,
392    component_ids: Option<Vec<String>>,
393    page: i64,
394    page_size: i64,
395}
396
397impl TracedEntryPointsParams {
398    pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
399        Self {
400            chain,
401            protocol_system: protocol_system.into(),
402            component_ids: None,
403            page: 0,
404            page_size: TracedEntryPointRequestBody::MAX_PAGE_SIZE_COMPRESSED,
405        }
406    }
407
408    pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
409        self.component_ids = Some(ids);
410        self
411    }
412
413    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
414        self.page = page;
415        self.page_size = page_size;
416        self
417    }
418}
419
420/// Parameters for [`RPCClient::get_protocol_components_paginated`].
421#[derive(Clone, PartialEq, Debug)]
422pub struct ProtocolComponentsPaginatedParams {
423    chain: Chain,
424    protocol_system: String,
425    component_ids: Option<Vec<ComponentId>>,
426    tvl_gt: Option<f64>,
427    chunk_size: Option<usize>,
428    concurrency: usize,
429}
430
431impl ProtocolComponentsPaginatedParams {
432    pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
433        Self {
434            chain,
435            protocol_system: protocol_system.into(),
436            component_ids: None,
437            tvl_gt: None,
438            chunk_size: None,
439            concurrency,
440        }
441    }
442
443    pub fn with_component_ids(mut self, ids: Vec<ComponentId>) -> Self {
444        self.component_ids = Some(ids);
445        self
446    }
447
448    pub fn with_tvl_gt(mut self, tvl_gt: f64) -> Self {
449        self.tvl_gt = Some(tvl_gt);
450        self
451    }
452
453    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
454        self.chunk_size = Some(chunk_size);
455        self
456    }
457}
458
459/// Parameters for [`RPCClient::get_traced_entry_points_paginated`].
460#[derive(Clone, PartialEq, Debug)]
461pub struct TracedEntryPointsPaginatedParams {
462    chain: Chain,
463    protocol_system: String,
464    component_ids: Vec<String>,
465    chunk_size: Option<usize>,
466    concurrency: usize,
467}
468
469impl TracedEntryPointsPaginatedParams {
470    pub fn new(
471        chain: Chain,
472        protocol_system: impl Into<String>,
473        component_ids: Vec<String>,
474        concurrency: usize,
475    ) -> Self {
476        Self {
477            chain,
478            protocol_system: protocol_system.into(),
479            component_ids,
480            chunk_size: None,
481            concurrency,
482        }
483    }
484
485    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
486        self.chunk_size = Some(chunk_size);
487        self
488    }
489}
490
491/// Parameters for [`RPCClient::get_protocol_states_paginated`].
492#[derive(Clone, PartialEq, Debug)]
493pub struct ProtocolStatesPaginatedParams {
494    chain: Chain,
495    protocol_system: String,
496    protocol_ids: Vec<String>,
497    include_balances: bool,
498    version: VersionParam,
499    chunk_size: Option<usize>,
500    concurrency: usize,
501}
502
503impl ProtocolStatesPaginatedParams {
504    pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
505        Self {
506            chain,
507            protocol_system: protocol_system.into(),
508            protocol_ids: Vec::new(),
509            include_balances: true,
510            version: VersionParam::default(),
511            chunk_size: None,
512            concurrency,
513        }
514    }
515
516    pub fn with_protocol_ids(mut self, ids: Vec<String>) -> Self {
517        self.protocol_ids = ids;
518        self
519    }
520
521    pub fn with_include_balances(mut self, include_balances: bool) -> Self {
522        self.include_balances = include_balances;
523        self
524    }
525
526    pub fn with_version(mut self, version: VersionParam) -> Self {
527        self.version = version;
528        self
529    }
530
531    pub fn with_block_number(mut self, block_number: u64) -> Self {
532        self.version = VersionParam::at_block(self.chain.into(), block_number);
533        self
534    }
535
536    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
537        self.chunk_size = Some(chunk_size);
538        self
539    }
540}
541
542/// Parameters for [`RPCClient::get_all_tokens`].
543#[derive(Clone, PartialEq, Debug)]
544pub struct AllTokensParams {
545    chain: Chain,
546    min_quality: Option<i32>,
547    traded_n_days_ago: Option<u64>,
548    chunk_size: Option<usize>,
549    concurrency: usize,
550}
551
552impl AllTokensParams {
553    pub fn new(chain: Chain, concurrency: usize) -> Self {
554        Self { chain, min_quality: None, traded_n_days_ago: None, chunk_size: None, concurrency }
555    }
556
557    pub fn with_min_quality(mut self, min_quality: i32) -> Self {
558        self.min_quality = Some(min_quality);
559        self
560    }
561
562    pub fn with_traded_n_days_ago(mut self, days: u64) -> Self {
563        self.traded_n_days_ago = Some(days);
564        self
565    }
566
567    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
568        self.chunk_size = Some(chunk_size);
569        self
570    }
571}
572
573/// Parameters for [`RPCClient::get_component_tvl_paginated`].
574#[derive(Clone, PartialEq, Debug)]
575pub struct ComponentTvlPaginatedParams {
576    chain: Chain,
577    protocol_system: Option<String>,
578    component_ids: Option<Vec<String>>,
579    chunk_size: Option<usize>,
580    concurrency: usize,
581}
582
583impl ComponentTvlPaginatedParams {
584    pub fn new(chain: Chain, concurrency: usize) -> Self {
585        Self { chain, protocol_system: None, component_ids: None, chunk_size: None, concurrency }
586    }
587
588    pub fn with_protocol_system(mut self, protocol_system: impl Into<String>) -> Self {
589        self.protocol_system = Some(protocol_system.into());
590        self
591    }
592
593    pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
594        self.component_ids = Some(ids);
595        self
596    }
597
598    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
599        self.chunk_size = Some(chunk_size);
600        self
601    }
602}
603
604/// Parameters for [`RPCClient::get_contract_state_paginated`].
605#[derive(Clone, PartialEq, Debug)]
606pub struct ContractStatePaginatedParams {
607    chain: Chain,
608    protocol_system: String,
609    contract_ids: Vec<Bytes>,
610    version: VersionParam,
611    chunk_size: Option<usize>,
612    concurrency: usize,
613}
614
615impl ContractStatePaginatedParams {
616    pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
617        Self {
618            chain,
619            protocol_system: protocol_system.into(),
620            contract_ids: Vec::new(),
621            version: VersionParam::default(),
622            chunk_size: None,
623            concurrency,
624        }
625    }
626
627    pub fn with_contract_ids(mut self, ids: Vec<Bytes>) -> Self {
628        self.contract_ids = ids;
629        self
630    }
631
632    pub fn with_version(mut self, version: VersionParam) -> Self {
633        self.version = version;
634        self
635    }
636
637    pub fn with_block_number(mut self, block_number: u64) -> Self {
638        self.version = VersionParam::at_block(self.chain.into(), block_number);
639        self
640    }
641
642    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
643        self.chunk_size = Some(chunk_size);
644        self
645    }
646}
647
648/// Request body for fetching a snapshot of protocol states and VM storage.
649///
650/// This struct helps to coordinate fetching  multiple pieces of related data
651/// (protocol states, contract storage, TVL, entry points).
652#[derive(Clone, Debug, PartialEq)]
653pub struct SnapshotParameters<'a> {
654    /// Which chain to fetch snapshots for
655    pub chain: Chain,
656    /// Protocol system name, required for correct state resolution
657    pub protocol_system: &'a str,
658    /// Components to fetch protocol states for
659    pub components: &'a HashMap<ComponentId, ProtocolComponent>,
660    /// Traced entry points data mapped by component id (model types)
661    pub entrypoints: Option<&'a TracedEntryPoints>,
662    /// Contract addresses to fetch VM storage for
663    pub contract_ids: &'a [Bytes],
664    /// Block number for versioning
665    pub block_number: u64,
666    /// Whether to include balance information
667    pub include_balances: bool,
668    /// Whether to fetch TVL data
669    pub include_tvl: bool,
670}
671
672impl<'a> SnapshotParameters<'a> {
673    pub fn new(
674        chain: Chain,
675        protocol_system: &'a str,
676        components: &'a HashMap<ComponentId, ProtocolComponent>,
677        contract_ids: &'a [Bytes],
678        block_number: u64,
679    ) -> Self {
680        Self {
681            chain,
682            protocol_system,
683            components,
684            entrypoints: None,
685            contract_ids,
686            block_number,
687            include_balances: true,
688            include_tvl: true,
689        }
690    }
691
692    /// Set whether to include balance information (default: true)
693    pub fn include_balances(mut self, include_balances: bool) -> Self {
694        self.include_balances = include_balances;
695        self
696    }
697
698    /// Set whether to fetch TVL data (default: true)
699    pub fn include_tvl(mut self, include_tvl: bool) -> Self {
700        self.include_tvl = include_tvl;
701        self
702    }
703
704    pub fn entrypoints(mut self, entrypoints: &'a TracedEntryPoints) -> Self {
705        self.entrypoints = Some(entrypoints);
706        self
707    }
708}
709
710#[derive(Error, Debug)]
711pub enum RPCError {
712    /// The passed tycho url failed to parse.
713    #[error("Failed to parse URL: {0}. Error: {1}")]
714    UrlParsing(String, String),
715
716    /// The request data is not correctly formed.
717    #[error("Failed to format request: {0}")]
718    FormatRequest(String),
719
720    /// Errors forwarded from the HTTP protocol.
721    #[error("Unexpected HTTP client error: {0}")]
722    HttpClient(String, #[source] reqwest::Error),
723
724    /// The response from the server could not be parsed correctly.
725    #[error("Failed to parse response: {0}")]
726    ParseResponse(String),
727
728    /// The requested block is outside the server's retention window.
729    #[error("Snapshot block is stale: {0}")]
730    StaleBlock(String),
731
732    /// The requested extractor does not exist on the server.
733    #[error("Unknown extractor: {0}")]
734    UnknownExtractor(String),
735
736    /// Other fatal errors.
737    #[error("Fatal error: {0}")]
738    Fatal(String),
739
740    #[error("Rate limited until {0:?}")]
741    RateLimited(Option<SystemTime>),
742
743    #[error("Server unreachable: {0}")]
744    ServerUnreachable(String),
745}
746
747impl RPCError {
748    /// Converts an HTTP response body parse failure into the correct `RPCError`.
749    ///
750    /// The tycho server returns plain-text error messages (not JSON) when a requested block falls
751    /// outside its retention window. Detecting these here gives callers a typed signal to retry
752    /// with a more recent block rather than treating it as an unrecoverable parse failure.
753    ///
754    /// NOTE: The string matching below is coupled to the server's error message text. If those
755    /// messages change server-side this silently regresses to `ParseResponse`. Replace with a
756    /// structured error code if the server ever returns typed error responses.
757    fn from_parse_error(err: serde_json::Error, body: &str) -> Self {
758        if body.contains("version is older than") || body.contains("Could not find Block") {
759            RPCError::StaleBlock(body.to_string())
760        } else if body.starts_with("Unknown extractor:") {
761            RPCError::UnknownExtractor(body.to_string())
762        } else {
763            RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
764        }
765    }
766}
767
768#[cfg_attr(test, automock)]
769#[async_trait]
770pub trait RPCClient: Send + Sync {
771    /// Returns whether compression is enabled for requests.
772    fn compression(&self) -> bool;
773
774    /// Retrieves a snapshot of contract state for the given contract addresses.
775    ///
776    /// `block_number` pins the query to a specific block; pass `None` to use the latest state.
777    async fn get_contract_state(
778        &self,
779        params: ContractStateParams,
780    ) -> Result<Page<Vec<Account>>, RPCError>;
781
782    /// Retrieves a snapshot of contract state for a set of contract IDs.
783    ///
784    /// If `chunk_size` is `None`, it defaults to the maximum page size.
785    async fn get_contract_state_paginated(
786        &self,
787        params: ContractStatePaginatedParams,
788    ) -> Result<Vec<Account>, RPCError> {
789        let semaphore = Arc::new(Semaphore::new(params.concurrency));
790
791        // Sort the ids to maximize server-side cache hits
792        let mut sorted_ids = params.contract_ids;
793        sorted_ids.sort();
794
795        let chunk_size = params
796            .chunk_size
797            .unwrap_or(StateRequestBody::effective_max_page_size(self.compression()) as usize);
798
799        let mut tasks = Vec::new();
800        for chunk in sorted_ids.chunks(chunk_size) {
801            let sem = semaphore.clone();
802            let base_params =
803                ContractStateParams::new(params.chain, params.protocol_system.as_str())
804                    .with_contract_ids(chunk.to_vec())
805                    .with_version(params.version.clone())
806                    .with_pagination(0, chunk_size as i64);
807            tasks.push(async move {
808                let _permit = sem
809                    .acquire()
810                    .await
811                    .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
812                self.get_contract_state(base_params)
813                    .await
814            });
815        }
816
817        let pages = try_join_all(tasks).await?;
818
819        let accounts = pages
820            .into_iter()
821            .flat_map(|p| p.into_iter())
822            .collect();
823
824        Ok(accounts)
825    }
826
827    /// Retrieves protocol components matching the given filters.
828    ///
829    /// Pass `component_ids` to filter by specific IDs, or `tvl_gt` to filter by minimum TVL.
830    async fn get_protocol_components(
831        &self,
832        params: ProtocolComponentsParams,
833    ) -> Result<Page<Vec<ProtocolComponent>>, RPCError>;
834
835    /// Retrieves protocol components, fetching all pages automatically.
836    ///
837    /// If `chunk_size` is `None`, it defaults to the maximum page size.
838    async fn get_protocol_components_paginated(
839        &self,
840        params: ProtocolComponentsPaginatedParams,
841    ) -> Result<Vec<ProtocolComponent>, RPCError> {
842        let chain = params.chain;
843        let protocol_system = params.protocol_system;
844        let component_ids = params.component_ids;
845        let tvl_gt = params.tvl_gt;
846        let chunk_size = params.chunk_size;
847        let concurrency = params.concurrency;
848
849        let semaphore = Arc::new(Semaphore::new(concurrency));
850
851        let chunk_size = chunk_size.unwrap_or(
852            ProtocolComponentsRequestBody::effective_max_page_size(self.compression()) as usize,
853        );
854
855        // If a set of component IDs is specified, the maximum return size is already known,
856        // allowing us to pre-compute the number of requests to be made.
857        match component_ids {
858            Some(ids) => {
859                let tasks: Vec<_> =
860                    ids.chunks(chunk_size)
861                        .enumerate()
862                        .map(|(index, chunk)| {
863                            let sem = semaphore.clone();
864                            let mut base =
865                                ProtocolComponentsParams::new(chain, protocol_system.as_str())
866                                    .with_component_ids(chunk.to_vec())
867                                    .with_pagination(index as i64, chunk_size as i64);
868                            if let Some(tvl) = tvl_gt {
869                                base = base.with_tvl_gt(tvl);
870                            }
871                            async move {
872                                let _permit = sem.acquire().await.map_err(|_| {
873                                    RPCError::Fatal("Semaphore dropped".to_string())
874                                })?;
875                                self.get_protocol_components(base).await
876                            }
877                        })
878                        .collect();
879
880                try_join_all(tasks)
881                    .await
882                    .map(|pages| pages.into_iter().flatten().collect())
883            }
884            None => {
885                // If no component ids are specified, we need to make requests based on the total
886                // number of results from the first response.
887                let mut base_params =
888                    ProtocolComponentsParams::new(chain, protocol_system.as_str())
889                        .with_pagination(0, chunk_size as i64);
890                if let Some(tvl) = tvl_gt {
891                    base_params = base_params.with_tvl_gt(tvl);
892                }
893
894                let first_page = self
895                    .get_protocol_components(base_params)
896                    .await?;
897
898                let total_items = first_page.total();
899                let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
900
901                let mut all: Vec<ProtocolComponent> = first_page.into_data();
902
903                let mut page = 1;
904                while page < total_pages {
905                    let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
906
907                    let tasks: Vec<_> = (0..requests_in_this_iteration)
908                        .map(|iter| {
909                            let sem = semaphore.clone();
910                            let mut p =
911                                ProtocolComponentsParams::new(chain, protocol_system.as_str())
912                                    .with_pagination(page + iter, chunk_size as i64);
913                            if let Some(tvl) = tvl_gt {
914                                p = p.with_tvl_gt(tvl);
915                            }
916                            async move {
917                                let _permit = sem.acquire().await.map_err(|_| {
918                                    RPCError::Fatal("Semaphore dropped".to_string())
919                                })?;
920                                self.get_protocol_components(p).await
921                            }
922                        })
923                        .collect();
924
925                    let responses = try_join_all(tasks).await?;
926
927                    for resp in responses {
928                        all.extend(resp);
929                    }
930
931                    page += requests_in_this_iteration;
932                }
933                Ok(all)
934            }
935        }
936    }
937
938    /// Retrieves a page of protocol component states.
939    ///
940    /// `block_number` pins the query to a specific block; pass `None` to use the latest state.
941    async fn get_protocol_states(
942        &self,
943        params: ProtocolStatesParams,
944    ) -> Result<Page<Vec<ProtocolComponentState>>, RPCError>;
945
946    /// Retrieves protocol states for a set of protocol IDs, fetching all pages automatically.
947    ///
948    /// If `chunk_size` is `None`, it defaults to the maximum page size.
949    async fn get_protocol_states_paginated(
950        &self,
951        params: ProtocolStatesPaginatedParams,
952    ) -> Result<Vec<ProtocolComponentState>, RPCError> {
953        let semaphore = Arc::new(Semaphore::new(params.concurrency));
954
955        let chunk_size =
956            params
957                .chunk_size
958                .unwrap_or(
959                    ProtocolStateRequestBody::effective_max_page_size(self.compression()) as usize
960                );
961
962        let tasks: Vec<_> = params
963            .protocol_ids
964            .chunks(chunk_size)
965            .map(|c| {
966                let sem = semaphore.clone();
967                let p = ProtocolStatesParams::new(params.chain, params.protocol_system.as_str())
968                    .with_protocol_ids(c.to_vec())
969                    .with_include_balances(params.include_balances)
970                    .with_version(params.version.clone())
971                    .with_pagination(0, chunk_size as i64);
972                async move {
973                    let _permit = sem
974                        .acquire()
975                        .await
976                        .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
977                    self.get_protocol_states(p).await
978                }
979            })
980            .collect();
981
982        try_join_all(tasks)
983            .await
984            .map(|pages| pages.into_iter().flatten().collect())
985    }
986
987    /// Retrieves a page of tokens.
988    ///
989    /// Use `get_all_tokens` to fetch all matching tokens automatically.
990    async fn get_tokens(&self, params: TokensParams) -> Result<Page<Vec<Token>>, RPCError>;
991
992    /// Retrieves all tokens matching the given criteria, fetching all pages automatically.
993    ///
994    /// If `chunk_size` is `None`, it defaults to the maximum page size.
995    async fn get_all_tokens(&self, params: AllTokensParams) -> Result<Vec<Token>, RPCError> {
996        let chunk_size = params
997            .chunk_size
998            .unwrap_or(TokensRequestBody::effective_max_page_size(self.compression()) as usize);
999
1000        let semaphore = Arc::new(Semaphore::new(params.concurrency));
1001
1002        let page_size: i64 = chunk_size.try_into().map_err(|_| {
1003            RPCError::FormatRequest("Failed to convert chunk_size into i64".to_string())
1004        })?;
1005
1006        let mut base_params = TokensParams::new(params.chain).with_pagination(0, page_size);
1007        if let Some(q) = params.min_quality {
1008            base_params = base_params.with_min_quality(q);
1009        }
1010        if let Some(d) = params.traded_n_days_ago {
1011            base_params = base_params.with_traded_n_days_ago(d);
1012        }
1013
1014        let first_page = self.get_tokens(base_params).await?;
1015        let total_pages = (first_page.total() as f64 / chunk_size as f64).ceil() as i64;
1016
1017        let mut all_tokens: Vec<Token> = first_page.into_data();
1018
1019        if total_pages <= 1 {
1020            return Ok(all_tokens);
1021        }
1022
1023        let tasks: Vec<_> = (1..total_pages)
1024            .map(|page| {
1025                let sem = semaphore.clone();
1026                let mut p = TokensParams::new(params.chain).with_pagination(page, page_size);
1027                if let Some(q) = params.min_quality {
1028                    p = p.with_min_quality(q);
1029                }
1030                if let Some(d) = params.traded_n_days_ago {
1031                    p = p.with_traded_n_days_ago(d);
1032                }
1033                async move {
1034                    let _permit = sem
1035                        .acquire()
1036                        .await
1037                        .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
1038                    self.get_tokens(p).await
1039                }
1040            })
1041            .collect();
1042
1043        let pages = try_join_all(tasks).await?;
1044        for page in pages {
1045            all_tokens.extend(page);
1046        }
1047
1048        Ok(all_tokens)
1049    }
1050
1051    /// Retrieves the protocol systems known to the server.
1052    async fn get_protocol_systems(
1053        &self,
1054        params: ProtocolSystemsParams,
1055    ) -> Result<Page<ProtocolSystems>, RPCError>;
1056
1057    /// Retrieves component TVL values.
1058    ///
1059    /// Filter by `component_ids` or by `protocol_system`; both are optional.
1060    async fn get_component_tvl(
1061        &self,
1062        params: ComponentTvlParams,
1063    ) -> Result<Page<HashMap<String, f64>>, RPCError>;
1064
1065    /// Retrieves component TVL values, fetching all pages automatically.
1066    ///
1067    /// If `chunk_size` is `None`, it defaults to the maximum page size.
1068    async fn get_component_tvl_paginated(
1069        &self,
1070        params: ComponentTvlPaginatedParams,
1071    ) -> Result<HashMap<String, f64>, RPCError> {
1072        let semaphore = Arc::new(Semaphore::new(params.concurrency));
1073
1074        let chunk_size =
1075            params
1076                .chunk_size
1077                .unwrap_or(
1078                    ComponentTvlRequestBody::effective_max_page_size(self.compression()) as usize
1079                );
1080
1081        match params.component_ids {
1082            Some(ids) => {
1083                let tasks: Vec<_> =
1084                    ids.chunks(chunk_size)
1085                        .enumerate()
1086                        .map(|(index, chunk)| {
1087                            let sem = semaphore.clone();
1088                            let mut p = ComponentTvlParams::new(params.chain)
1089                                .with_component_ids(chunk.to_vec())
1090                                .with_pagination(index as i64, chunk_size as i64);
1091                            if let Some(ref ps) = params.protocol_system {
1092                                p = p.with_protocol_system(ps.as_str());
1093                            }
1094                            async move {
1095                                let _permit = sem.acquire().await.map_err(|_| {
1096                                    RPCError::Fatal("Semaphore dropped".to_string())
1097                                })?;
1098                                self.get_component_tvl(p).await
1099                            }
1100                        })
1101                        .collect();
1102
1103                let pages = try_join_all(tasks).await?;
1104
1105                let mut merged_tvl = HashMap::new();
1106                for page in pages {
1107                    for (key, value) in page {
1108                        *merged_tvl.entry(key).or_insert(0.0) = value;
1109                    }
1110                }
1111
1112                Ok(merged_tvl)
1113            }
1114            None => {
1115                let mut base =
1116                    ComponentTvlParams::new(params.chain).with_pagination(0, chunk_size as i64);
1117                if let Some(ref ps) = params.protocol_system {
1118                    base = base.with_protocol_system(ps.as_str());
1119                }
1120
1121                let first_page = self.get_component_tvl(base).await?;
1122                let total_items = first_page.total();
1123                let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
1124
1125                let mut merged_tvl: HashMap<String, f64> = first_page.into_data();
1126
1127                let mut page = 1;
1128                while page < total_pages {
1129                    let requests_in_this_iteration =
1130                        (total_pages - page).min(params.concurrency as i64);
1131
1132                    let tasks: Vec<_> = (0..requests_in_this_iteration)
1133                        .map(|i| {
1134                            let sem = semaphore.clone();
1135                            let mut p = ComponentTvlParams::new(params.chain)
1136                                .with_pagination(page + i, chunk_size as i64);
1137                            if let Some(ref ps) = params.protocol_system {
1138                                p = p.with_protocol_system(ps.as_str());
1139                            }
1140                            async move {
1141                                let _permit = sem.acquire().await.map_err(|_| {
1142                                    RPCError::Fatal("Semaphore dropped".to_string())
1143                                })?;
1144                                self.get_component_tvl(p).await
1145                            }
1146                        })
1147                        .collect();
1148
1149                    let responses = try_join_all(tasks).await?;
1150
1151                    for resp in responses {
1152                        for (key, value) in resp {
1153                            *merged_tvl.entry(key).or_insert(0.0) = value;
1154                        }
1155                    }
1156
1157                    page += requests_in_this_iteration;
1158                }
1159
1160                Ok(merged_tvl)
1161            }
1162        }
1163    }
1164
1165    /// Retrieves a page of traced entry points.
1166    ///
1167    /// Use `get_traced_entry_points_paginated` to fetch all pages automatically.
1168    async fn get_traced_entry_points(
1169        &self,
1170        params: TracedEntryPointsParams,
1171    ) -> Result<Page<TracedEntryPoints>, RPCError>;
1172
1173    /// Retrieves traced entry points for a set of component IDs, fetching all pages automatically.
1174    ///
1175    /// If `chunk_size` is `None`, it defaults to the maximum page size.
1176    async fn get_traced_entry_points_paginated(
1177        &self,
1178        params: TracedEntryPointsPaginatedParams,
1179    ) -> Result<TracedEntryPoints, RPCError> {
1180        let chain = params.chain;
1181        let protocol_system = params.protocol_system;
1182        let component_ids = params.component_ids;
1183        let chunk_size = params.chunk_size;
1184        let concurrency = params.concurrency;
1185
1186        let semaphore = Arc::new(Semaphore::new(concurrency));
1187
1188        let chunk_size = chunk_size.unwrap_or(
1189            TracedEntryPointRequestBody::effective_max_page_size(self.compression()) as usize,
1190        );
1191
1192        let tasks: Vec<_> = component_ids
1193            .chunks(chunk_size)
1194            .map(|c| {
1195                let sem = semaphore.clone();
1196                let params = TracedEntryPointsParams::new(chain, protocol_system.as_str())
1197                    .with_component_ids(c.to_vec())
1198                    .with_pagination(0, chunk_size as i64);
1199                async move {
1200                    let _permit = sem
1201                        .acquire()
1202                        .await
1203                        .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
1204                    self.get_traced_entry_points(params)
1205                        .await
1206                }
1207            })
1208            .collect();
1209
1210        try_join_all(tasks)
1211            .await
1212            .map(|pages| pages.into_iter().flatten().collect())
1213    }
1214
1215    // clippy false positive: `'a` is required by the trait method signature and is
1216    // used in `SnapshotParameters<'a>`, but `async_trait` makes Clippy miss it.
1217    #[allow(clippy::extra_unused_lifetimes)]
1218    async fn get_snapshots<'a>(
1219        &self,
1220        request: &SnapshotParameters<'a>,
1221        chunk_size: Option<usize>,
1222        concurrency: usize,
1223    ) -> Result<Snapshot, RPCError>;
1224}
1225
1226/// Configuration options for HttpRPCClient
1227#[derive(Debug, Clone)]
1228pub struct HttpRPCClientOptions {
1229    /// Optional API key for authentication
1230    pub auth_key: Option<String>,
1231    /// Enable compression for requests (default: true)
1232    /// When enabled, adds Accept-Encoding: zstd header
1233    pub compression: bool,
1234}
1235
1236impl Default for HttpRPCClientOptions {
1237    fn default() -> Self {
1238        Self::new()
1239    }
1240}
1241
1242impl HttpRPCClientOptions {
1243    /// Create new options with default values (compression enabled)
1244    pub fn new() -> Self {
1245        Self { auth_key: None, compression: true }
1246    }
1247
1248    /// Set the authentication key
1249    pub fn with_auth_key(mut self, auth_key: Option<String>) -> Self {
1250        self.auth_key = auth_key;
1251        self
1252    }
1253
1254    /// Set whether to enable compression (default: true)
1255    pub fn with_compression(mut self, compression: bool) -> Self {
1256        self.compression = compression;
1257        self
1258    }
1259}
1260
1261#[derive(Debug, Clone)]
1262pub struct HttpRPCClient {
1263    http_client: Client,
1264    url: Url,
1265    retry_after: Arc<RwLock<Option<SystemTime>>>,
1266    backoff_policy: ExponentialBackoff,
1267    server_restart_duration: Duration,
1268    compression: bool,
1269}
1270
1271impl HttpRPCClient {
1272    pub fn new(base_uri: &str, options: HttpRPCClientOptions) -> Result<Self, RPCError> {
1273        let uri = base_uri
1274            .parse::<Url>()
1275            .map_err(|e| RPCError::UrlParsing(base_uri.to_string(), e.to_string()))?;
1276
1277        // Add default headers
1278        let mut headers = header::HeaderMap::new();
1279        headers.insert(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"));
1280        let user_agent = format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION"));
1281        headers.insert(
1282            header::USER_AGENT,
1283            header::HeaderValue::from_str(&user_agent)
1284                .map_err(|e| RPCError::FormatRequest(format!("Invalid user agent format: {e}")))?,
1285        );
1286
1287        // Add Authorization if one is given
1288        if let Some(key) = options.auth_key.as_deref() {
1289            let mut auth_value = header::HeaderValue::from_str(key).map_err(|e| {
1290                RPCError::FormatRequest(format!("Invalid authorization key format: {e}"))
1291            })?;
1292            auth_value.set_sensitive(true);
1293            headers.insert(header::AUTHORIZATION, auth_value);
1294        }
1295
1296        let mut client_builder = ClientBuilder::new()
1297            .default_headers(headers)
1298            .http2_prior_knowledge();
1299
1300        // When compression is disabled, turn off all automatic compression
1301        if !options.compression {
1302            client_builder = client_builder.no_zstd();
1303        }
1304
1305        let client = client_builder
1306            .build()
1307            .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
1308
1309        Ok(Self {
1310            http_client: client,
1311            url: uri,
1312            retry_after: Arc::new(RwLock::new(None)),
1313            backoff_policy: ExponentialBackoffBuilder::new()
1314                .with_initial_interval(Duration::from_millis(250))
1315                // increase backoff time by 75% each failure
1316                .with_multiplier(1.75)
1317                // keep retrying every 30s
1318                .with_max_interval(Duration::from_secs(30))
1319                // if all retries take longer than 2m, give up
1320                .with_max_elapsed_time(Some(Duration::from_secs(125)))
1321                .build(),
1322            server_restart_duration: Duration::from_secs(120),
1323            compression: options.compression,
1324        })
1325    }
1326
1327    #[cfg(test)]
1328    pub fn with_test_backoff_policy(mut self) -> Self {
1329        // Extremely short intervals for very fast testing
1330        self.backoff_policy = ExponentialBackoffBuilder::new()
1331            .with_initial_interval(Duration::from_millis(1))
1332            .with_multiplier(1.1)
1333            .with_max_interval(Duration::from_millis(5))
1334            .with_max_elapsed_time(Some(Duration::from_millis(50)))
1335            .build();
1336        self.server_restart_duration = Duration::from_millis(50);
1337        self
1338    }
1339
1340    /// Converts a error response to a Result.
1341    ///
1342    /// Raises an error if the response status code id 429, 502, 503 or 504. In the 429
1343    /// case it will try to look for a retry-after header an parse it accordingly. The
1344    /// parsed value is then passed as part of the error.
1345    async fn error_for_response(
1346        &self,
1347        response: reqwest::Response,
1348    ) -> Result<reqwest::Response, RPCError> {
1349        match response.status() {
1350            StatusCode::TOO_MANY_REQUESTS => {
1351                let retry_after_raw = response
1352                    .headers()
1353                    .get(reqwest::header::RETRY_AFTER)
1354                    .and_then(|h| h.to_str().ok())
1355                    .and_then(parse_retry_value);
1356
1357                let reason = response
1358                    .text()
1359                    .await
1360                    .unwrap_or_default();
1361                warn!(reason, retry_after = ?retry_after_raw, "Rate limited by server");
1362
1363                Err(RPCError::RateLimited(retry_after_raw))
1364            }
1365            StatusCode::BAD_GATEWAY |
1366            StatusCode::SERVICE_UNAVAILABLE |
1367            StatusCode::GATEWAY_TIMEOUT => Err(RPCError::ServerUnreachable(
1368                response
1369                    .text()
1370                    .await
1371                    .unwrap_or_else(|_| "Server Unreachable".to_string()),
1372            )),
1373            _ => Ok(response),
1374        }
1375    }
1376
1377    /// Classifies errors into transient or permanent ones.
1378    ///
1379    /// Transient errors are retried with a potential backoff, permanent ones are not.
1380    /// If the error is RateLimited, this method will set the self.retry_after value so
1381    /// future requests wait until the rate limit has been reset.
1382    async fn handle_error_for_backoff(&self, e: RPCError) -> backoff::Error<RPCError> {
1383        match e {
1384            RPCError::ServerUnreachable(_) => {
1385                backoff::Error::retry_after(e, self.server_restart_duration)
1386            }
1387            RPCError::RateLimited(Some(until)) => {
1388                let mut retry_after_guard = self.retry_after.write().await;
1389                *retry_after_guard = Some(
1390                    retry_after_guard
1391                        .unwrap_or(until)
1392                        .max(until),
1393                );
1394
1395                if let Ok(duration) = until.duration_since(SystemTime::now()) {
1396                    backoff::Error::retry_after(e, duration)
1397                } else {
1398                    e.into()
1399                }
1400            }
1401            RPCError::RateLimited(None) => e.into(),
1402            _ => backoff::Error::permanent(e),
1403        }
1404    }
1405
1406    /// Waits until the current rate limit time has passed.
1407    ///
1408    /// Only waits if there is a time and that time is in the future, else return
1409    /// immediately.
1410    async fn wait_until_retry_after(&self) {
1411        if let Some(&until) = self.retry_after.read().await.as_ref() {
1412            let now = SystemTime::now();
1413            if until > now {
1414                if let Ok(duration) = until.duration_since(now) {
1415                    sleep(duration).await
1416                }
1417            }
1418        }
1419    }
1420
1421    /// Makes a post request handling transient failures.
1422    ///
1423    /// If a retry-after header is received it will be respected. Else the configured
1424    /// backoff policy is used to deal with transient network or server errors.
1425    async fn make_post_request<T: Serialize + ?Sized>(
1426        &self,
1427        request: &T,
1428        uri: &String,
1429    ) -> Result<Response, RPCError> {
1430        self.wait_until_retry_after().await;
1431        let response = backoff::future::retry(self.backoff_policy.clone(), || async {
1432            let server_response = self
1433                .http_client
1434                .post(uri)
1435                .json(request)
1436                .send()
1437                .await
1438                .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
1439
1440            match self
1441                .error_for_response(server_response)
1442                .await
1443            {
1444                Ok(response) => Ok(response),
1445                Err(e) => Err(self.handle_error_for_backoff(e).await),
1446            }
1447        })
1448        .await?;
1449        Ok(response)
1450    }
1451}
1452
1453fn parse_retry_value(val: &str) -> Option<SystemTime> {
1454    if let Ok(secs) = val.parse::<u64>() {
1455        return Some(SystemTime::now() + Duration::from_secs(secs));
1456    }
1457    if let Ok(date) = OffsetDateTime::parse(val, &Rfc2822) {
1458        return Some(date.into());
1459    }
1460    None
1461}
1462
1463#[async_trait]
1464impl RPCClient for HttpRPCClient {
1465    fn compression(&self) -> bool {
1466        self.compression
1467    }
1468
1469    #[instrument(skip(self))]
1470    async fn get_contract_state(
1471        &self,
1472        params: ContractStateParams,
1473    ) -> Result<Page<Vec<Account>>, RPCError> {
1474        if params
1475            .contract_ids
1476            .as_ref()
1477            .is_none_or(|ids| ids.is_empty())
1478        {
1479            warn!("No contract ids specified in request.");
1480        }
1481
1482        let request = StateRequestBody {
1483            contract_ids: params.contract_ids,
1484            protocol_system: params.protocol_system,
1485            chain: params.chain.into(),
1486            version: params.version,
1487            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1488        };
1489
1490        let uri = format!(
1491            "{}/{}/contract_state",
1492            self.url
1493                .to_string()
1494                .trim_end_matches('/'),
1495            TYCHO_SERVER_VERSION
1496        );
1497        debug!(%uri, "Sending contract_state request to Tycho server");
1498        trace!(?request, "Sending request to Tycho server");
1499        let response = self
1500            .make_post_request(&request, &uri)
1501            .await?;
1502        trace!(?response, "Received response from Tycho server");
1503
1504        let body = response
1505            .text()
1506            .await
1507            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1508        if body.is_empty() {
1509            // Pure native protocols will return empty contract states
1510            return Ok(Page::new(vec![], 0, 0, 0));
1511        }
1512
1513        let dto_response = serde_json::from_str::<StateRequestResponse>(&body)
1514            .map_err(|err| RPCError::from_parse_error(err, &body))?;
1515        trace!(?dto_response, "Received contract_state response from Tycho server");
1516
1517        let data: Vec<Account> = dto_response
1518            .accounts
1519            .into_iter()
1520            .map(Account::from)
1521            .collect();
1522        Ok(Page::new(
1523            data,
1524            dto_response.pagination.total,
1525            dto_response.pagination.page,
1526            dto_response.pagination.page_size,
1527        ))
1528    }
1529
1530    async fn get_protocol_components(
1531        &self,
1532        params: ProtocolComponentsParams,
1533    ) -> Result<Page<Vec<ProtocolComponent>>, RPCError> {
1534        let request = ProtocolComponentsRequestBody {
1535            protocol_system: params.protocol_system,
1536            component_ids: params.component_ids,
1537            tvl_gt: params.tvl_gt,
1538            chain: params.chain.into(),
1539            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1540        };
1541
1542        let uri = format!(
1543            "{}/{}/protocol_components",
1544            self.url
1545                .to_string()
1546                .trim_end_matches('/'),
1547            TYCHO_SERVER_VERSION,
1548        );
1549        debug!(%uri, "Sending protocol_components request to Tycho server");
1550        trace!(?request, "Sending request to Tycho server");
1551
1552        let response = self
1553            .make_post_request(&request, &uri)
1554            .await?;
1555
1556        trace!(?response, "Received response from Tycho server");
1557
1558        let body = response
1559            .text()
1560            .await
1561            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1562        let dto_response = serde_json::from_str::<ProtocolComponentRequestResponse>(&body)
1563            .map_err(|err| RPCError::from_parse_error(err, &body))?;
1564        trace!(?dto_response, "Received protocol_components response from Tycho server");
1565
1566        let data: Vec<ProtocolComponent> = dto_response
1567            .protocol_components
1568            .into_iter()
1569            .map(ProtocolComponent::from)
1570            .collect();
1571        Ok(Page::new(
1572            data,
1573            dto_response.pagination.total,
1574            dto_response.pagination.page,
1575            dto_response.pagination.page_size,
1576        ))
1577    }
1578
1579    async fn get_protocol_states(
1580        &self,
1581        params: ProtocolStatesParams,
1582    ) -> Result<Page<Vec<ProtocolComponentState>>, RPCError> {
1583        if params
1584            .protocol_ids
1585            .as_ref()
1586            .is_none_or(|ids| ids.is_empty())
1587        {
1588            warn!("No protocol ids specified in request.");
1589        }
1590
1591        let request = ProtocolStateRequestBody {
1592            protocol_ids: params.protocol_ids,
1593            protocol_system: params.protocol_system,
1594            chain: params.chain.into(),
1595            include_balances: params.include_balances,
1596            version: params.version,
1597            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1598        };
1599
1600        let uri = format!(
1601            "{}/{}/protocol_state",
1602            self.url
1603                .to_string()
1604                .trim_end_matches('/'),
1605            TYCHO_SERVER_VERSION
1606        );
1607        debug!(%uri, "Sending protocol_states request to Tycho server");
1608        trace!(?request, "Sending request to Tycho server");
1609
1610        let response = self
1611            .make_post_request(&request, &uri)
1612            .await?;
1613        trace!(?response, "Received response from Tycho server");
1614
1615        let body = response
1616            .text()
1617            .await
1618            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1619
1620        if body.is_empty() {
1621            // Pure VM protocols will return empty states
1622            return Ok(Page::new(vec![], 0, 0, 0));
1623        }
1624
1625        let dto_response = serde_json::from_str::<ProtocolStateRequestResponse>(&body)
1626            .map_err(|err| RPCError::from_parse_error(err, &body))?;
1627        trace!(?dto_response, "Received protocol_states response from Tycho server");
1628
1629        let data: Vec<ProtocolComponentState> = dto_response
1630            .states
1631            .into_iter()
1632            .map(ProtocolComponentState::from)
1633            .collect();
1634        Ok(Page::new(
1635            data,
1636            dto_response.pagination.total,
1637            dto_response.pagination.page,
1638            dto_response.pagination.page_size,
1639        ))
1640    }
1641
1642    async fn get_tokens(&self, params: TokensParams) -> Result<Page<Vec<Token>>, RPCError> {
1643        let request = TokensRequestBody {
1644            token_addresses: None,
1645            min_quality: params.min_quality,
1646            traded_n_days_ago: params.traded_n_days_ago,
1647            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1648            chain: params.chain.into(),
1649        };
1650
1651        let uri = format!(
1652            "{}/{}/tokens",
1653            self.url
1654                .to_string()
1655                .trim_end_matches('/'),
1656            TYCHO_SERVER_VERSION
1657        );
1658        debug!(%uri, "Sending tokens request to Tycho server");
1659
1660        let response = self
1661            .make_post_request(&request, &uri)
1662            .await?;
1663
1664        let body = response
1665            .text()
1666            .await
1667            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1668        let dto_response = serde_json::from_str::<TokensRequestResponse>(&body)
1669            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1670
1671        let data: Vec<Token> = dto_response
1672            .tokens
1673            .into_iter()
1674            .map(Token::from)
1675            .collect();
1676        Ok(Page::new(
1677            data,
1678            dto_response.pagination.total,
1679            dto_response.pagination.page,
1680            dto_response.pagination.page_size,
1681        ))
1682    }
1683
1684    async fn get_protocol_systems(
1685        &self,
1686        params: ProtocolSystemsParams,
1687    ) -> Result<Page<ProtocolSystems>, RPCError> {
1688        let request = ProtocolSystemsRequestBody {
1689            chain: params.chain.into(),
1690            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1691        };
1692
1693        let uri = format!(
1694            "{}/{}/protocol_systems",
1695            self.url
1696                .to_string()
1697                .trim_end_matches('/'),
1698            TYCHO_SERVER_VERSION
1699        );
1700        debug!(%uri, "Sending protocol_systems request to Tycho server");
1701        trace!(?request, "Sending request to Tycho server");
1702        let response = self
1703            .make_post_request(&request, &uri)
1704            .await?;
1705        trace!(?response, "Received response from Tycho server");
1706        let body = response
1707            .text()
1708            .await
1709            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1710        let dto = serde_json::from_str::<ProtocolSystemsRequestResponse>(&body)
1711            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1712        trace!(?dto, "Received protocol_systems response from Tycho server");
1713        Ok(Page::new(
1714            ProtocolSystems::new(dto.protocol_systems, dto.dci_protocols),
1715            dto.pagination.total,
1716            dto.pagination.page,
1717            dto.pagination.page_size,
1718        ))
1719    }
1720
1721    async fn get_component_tvl(
1722        &self,
1723        params: ComponentTvlParams,
1724    ) -> Result<Page<HashMap<String, f64>>, RPCError> {
1725        let request = ComponentTvlRequestBody {
1726            chain: params.chain.into(),
1727            protocol_system: params.protocol_system,
1728            component_ids: params.component_ids,
1729            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1730        };
1731
1732        let uri = format!(
1733            "{}/{}/component_tvl",
1734            self.url
1735                .to_string()
1736                .trim_end_matches('/'),
1737            TYCHO_SERVER_VERSION
1738        );
1739        debug!(%uri, "Sending get_component_tvl request to Tycho server");
1740        trace!(?request, "Sending request to Tycho server");
1741        let response = self
1742            .make_post_request(&request, &uri)
1743            .await?;
1744        trace!(?response, "Received response from Tycho server");
1745        let body = response
1746            .text()
1747            .await
1748            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1749        let dto_response =
1750            serde_json::from_str::<ComponentTvlRequestResponse>(&body).map_err(|err| {
1751                error!("Failed to parse component_tvl response: {:?}", &body);
1752                RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1753            })?;
1754        trace!(?dto_response, "Received component_tvl response from Tycho server");
1755        Ok(Page::new(
1756            dto_response.tvl,
1757            dto_response.pagination.total,
1758            dto_response.pagination.page,
1759            dto_response.pagination.page_size,
1760        ))
1761    }
1762
1763    async fn get_traced_entry_points(
1764        &self,
1765        params: TracedEntryPointsParams,
1766    ) -> Result<Page<TracedEntryPoints>, RPCError> {
1767        let request = TracedEntryPointRequestBody {
1768            chain: params.chain.into(),
1769            protocol_system: params.protocol_system,
1770            component_ids: params.component_ids,
1771            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1772        };
1773
1774        let uri = format!(
1775            "{}/{TYCHO_SERVER_VERSION}/traced_entry_points",
1776            self.url
1777                .to_string()
1778                .trim_end_matches('/')
1779        );
1780        debug!(%uri, "Sending traced_entry_points request to Tycho server");
1781        trace!(?request, "Sending request to Tycho server");
1782
1783        let response = self
1784            .make_post_request(&request, &uri)
1785            .await?;
1786
1787        trace!(?response, "Received response from Tycho server");
1788
1789        let body = response
1790            .text()
1791            .await
1792            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1793        let dto_response =
1794            serde_json::from_str::<TracedEntryPointRequestResponse>(&body).map_err(|err| {
1795                error!("Failed to parse traced_entry_points response: {:?}", &body);
1796                RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1797            })?;
1798        trace!(?dto_response, "Received traced_entry_points response from Tycho server");
1799        let data: TracedEntryPoints = dto_response
1800            .traced_entry_points
1801            .into_iter()
1802            .map(|(k, v)| {
1803                (
1804                    k,
1805                    v.into_iter()
1806                        .map(|(ep, tr)| {
1807                            (EntryPointWithTracingParams::from(ep), TracingResult::from(tr))
1808                        })
1809                        .collect(),
1810                )
1811            })
1812            .collect();
1813        Ok(Page::new(
1814            data,
1815            dto_response.pagination.total,
1816            dto_response.pagination.page,
1817            dto_response.pagination.page_size,
1818        ))
1819    }
1820
1821    async fn get_snapshots<'a>(
1822        &self,
1823        request: &SnapshotParameters<'a>,
1824        chunk_size: Option<usize>,
1825        concurrency: usize,
1826    ) -> Result<Snapshot, RPCError> {
1827        let component_ids: Vec<_> = request
1828            .components
1829            .keys()
1830            .cloned()
1831            .collect();
1832
1833        let component_tvl = if request.include_tvl && !component_ids.is_empty() {
1834            self.get_component_tvl_paginated(
1835                ComponentTvlPaginatedParams::new(request.chain, concurrency)
1836                    .with_component_ids(component_ids.clone()),
1837            )
1838            .await?
1839        } else {
1840            HashMap::new()
1841        };
1842
1843        let version = VersionParam::at_block(request.chain.into(), request.block_number);
1844
1845        let mut protocol_states = if !component_ids.is_empty() {
1846            self.get_protocol_states_paginated(
1847                ProtocolStatesPaginatedParams::new(
1848                    request.chain,
1849                    request.protocol_system,
1850                    concurrency,
1851                )
1852                .with_protocol_ids(component_ids.clone())
1853                .with_include_balances(request.include_balances)
1854                .with_version(version.clone()),
1855            )
1856            .await?
1857            .into_iter()
1858            .map(|state| (state.component_id.clone(), state))
1859            .collect()
1860        } else {
1861            HashMap::new()
1862        };
1863
1864        // Convert to ComponentWithState, which includes entrypoint information.
1865        let states = request
1866            .components
1867            .values()
1868            .filter_map(|component| {
1869                if let Some(state) = protocol_states.remove(&component.id) {
1870                    Some((
1871                        component.id.clone(),
1872                        ComponentWithState {
1873                            state,
1874                            component: component.clone(),
1875                            component_tvl: component_tvl
1876                                .get(&component.id)
1877                                .cloned(),
1878                            entrypoints: request
1879                                .entrypoints
1880                                .as_ref()
1881                                .and_then(|map| map.get(&component.id))
1882                                .cloned()
1883                                .unwrap_or_default(),
1884                        },
1885                    ))
1886                } else if component_ids.contains(&component.id) {
1887                    // only emit error event if we requested this component
1888                    let component_id = &component.id;
1889                    error!(?component_id, "Missing state for native component!");
1890                    None
1891                } else {
1892                    None
1893                }
1894            })
1895            .collect();
1896
1897        let vm_storage = if !request.contract_ids.is_empty() {
1898            let mut cp_params = ContractStatePaginatedParams::new(
1899                request.chain,
1900                request.protocol_system,
1901                concurrency,
1902            )
1903            .with_contract_ids(request.contract_ids.to_vec())
1904            .with_version(version.clone());
1905            if let Some(cs) = chunk_size {
1906                cp_params = cp_params.with_chunk_size(cs);
1907            }
1908            let contract_states = self
1909                .get_contract_state_paginated(cp_params)
1910                .await?
1911                .into_iter()
1912                .map(|acc| (acc.address.clone(), acc))
1913                .collect::<HashMap<_, _>>();
1914
1915            trace!(states=?&contract_states, "Retrieved ContractState");
1916
1917            let contract_address_to_components = request
1918                .components
1919                .iter()
1920                .filter_map(|(id, comp)| {
1921                    if component_ids.contains(id) {
1922                        Some(
1923                            comp.contract_addresses
1924                                .iter()
1925                                .map(|address| (address.clone(), comp.id.clone())),
1926                        )
1927                    } else {
1928                        None
1929                    }
1930                })
1931                .flatten()
1932                .fold(HashMap::<Bytes, Vec<String>>::new(), |mut acc, (addr, c_id)| {
1933                    acc.entry(addr).or_default().push(c_id);
1934                    acc
1935                });
1936
1937            request
1938                .contract_ids
1939                .iter()
1940                .filter_map(|address| {
1941                    if let Some(state) = contract_states.get(address) {
1942                        Some((address.clone(), state.clone()))
1943                    } else if let Some(ids) = contract_address_to_components.get(address) {
1944                        // only emit error even if we did actually request this address
1945                        error!(
1946                            ?address,
1947                            ?ids,
1948                            "Component with lacking contract storage encountered!"
1949                        );
1950                        None
1951                    } else {
1952                        None
1953                    }
1954                })
1955                .collect()
1956        } else {
1957            HashMap::new()
1958        };
1959
1960        Ok(Snapshot { states, vm_storage })
1961    }
1962}
1963
1964#[cfg(test)]
1965mod tests {
1966    use std::{
1967        collections::{HashMap, HashSet},
1968        str::FromStr,
1969    };
1970
1971    use mockito::Server;
1972    use rstest::rstest;
1973    use tycho_common::models::blockchain::AddressStorageLocation;
1974
1975    use super::*;
1976
1977    // Dummy implementation of `get_protocol_states_paginated` for backwards compatibility testing
1978    // purposes
1979    impl MockRPCClient {
1980        #[allow(clippy::too_many_arguments)]
1981        async fn test_get_protocol_states_paginated<T>(
1982            &self,
1983            chain: Chain,
1984            ids: &[T],
1985            protocol_system: &str,
1986            include_balances: bool,
1987            block_number: Option<u64>,
1988            chunk_size: usize,
1989            _concurrency: usize,
1990        ) -> Vec<(Chain, Vec<String>, String, bool, Option<u64>, PaginationParams)>
1991        where
1992            T: AsRef<str> + Clone + Send + Sync + 'static,
1993        {
1994            ids.chunks(chunk_size)
1995                .map(|chunk| {
1996                    (
1997                        chain,
1998                        chunk
1999                            .iter()
2000                            .map(|id| id.as_ref().to_string())
2001                            .collect(),
2002                        protocol_system.to_string(),
2003                        include_balances,
2004                        block_number,
2005                        PaginationParams { page: 0, page_size: chunk_size as i64 },
2006                    )
2007                })
2008                .collect()
2009        }
2010    }
2011
2012    const GET_CONTRACT_STATE_RESP: &str = r#"
2013        {
2014            "accounts": [
2015                {
2016                    "chain": "ethereum",
2017                    "address": "0x0000000000000000000000000000000000000000",
2018                    "title": "",
2019                    "slots": {},
2020                    "native_balance": "0x01f4",
2021                    "token_balances": {},
2022                    "code": "0x00",
2023                    "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
2024                    "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2025                    "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2026                    "creation_tx": null
2027                }
2028            ],
2029            "pagination": {
2030                "page": 0,
2031                "page_size": 20,
2032                "total": 10
2033            }
2034        }
2035        "#;
2036
2037    #[rstest]
2038    #[case::string_input(vec![
2039        "id1".to_string(),
2040        "id2".to_string()
2041    ])]
2042    #[tokio::test]
2043    async fn test_get_protocol_states_paginated<T>(#[case] ids: Vec<T>)
2044    where
2045        T: AsRef<str> + Clone + Send + Sync + 'static,
2046    {
2047        let mock_client = MockRPCClient::new();
2048
2049        let request_args = mock_client
2050            .test_get_protocol_states_paginated(
2051                Chain::Ethereum,
2052                &ids,
2053                "test_system",
2054                true,
2055                None,
2056                2,
2057                2,
2058            )
2059            .await;
2060
2061        // Verify that the request args have been split into chunks correctly
2062        assert_eq!(request_args.len(), 1);
2063        assert_eq!(request_args[0].1.len(), 2);
2064    }
2065
2066    #[tokio::test]
2067    async fn test_get_contract_state() {
2068        let mut server = Server::new_async().await;
2069        let server_resp = GET_CONTRACT_STATE_RESP;
2070        // test that the response is deserialized correctly
2071        serde_json::from_str::<StateRequestResponse>(server_resp).expect("deserialize");
2072
2073        let mocked_server = server
2074            .mock("POST", "/v1/contract_state")
2075            .expect(1)
2076            .with_body(server_resp)
2077            .create_async()
2078            .await;
2079
2080        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2081            .expect("create client");
2082
2083        let accounts = client
2084            .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
2085            .await
2086            .expect("get state");
2087
2088        mocked_server.assert();
2089        assert_eq!(accounts.data().len(), 1);
2090        assert_eq!(accounts.data()[0].slots, HashMap::new());
2091        assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
2092        assert_eq!(accounts.data()[0].code, [0].to_vec());
2093        assert_eq!(
2094            accounts.data()[0].code_hash,
2095            hex::decode("5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e")
2096                .unwrap()
2097        );
2098    }
2099
2100    #[tokio::test]
2101    async fn test_get_protocol_components() {
2102        let mut server = Server::new_async().await;
2103        let server_resp = r#"
2104        {
2105            "protocol_components": [
2106                {
2107                    "id": "State1",
2108                    "protocol_system": "ambient",
2109                    "protocol_type_name": "Pool",
2110                    "chain": "ethereum",
2111                    "tokens": [
2112                        "0x0000000000000000000000000000000000000000",
2113                        "0x0000000000000000000000000000000000000001"
2114                    ],
2115                    "contract_ids": [
2116                        "0x0000000000000000000000000000000000000000"
2117                    ],
2118                    "static_attributes": {
2119                        "attribute_1": "0x00000000000003e8"
2120                    },
2121                    "change": "Creation",
2122                    "creation_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2123                    "created_at": "2022-01-01T00:00:00"
2124                }
2125            ],
2126            "pagination": {
2127                "page": 0,
2128                "page_size": 20,
2129                "total": 10
2130            }
2131        }
2132        "#;
2133        // test that the response is deserialized correctly
2134        serde_json::from_str::<ProtocolComponentRequestResponse>(server_resp).expect("deserialize");
2135
2136        let mocked_server = server
2137            .mock("POST", "/v1/protocol_components")
2138            .expect(1)
2139            .with_body(server_resp)
2140            .create_async()
2141            .await;
2142
2143        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2144            .expect("create client");
2145
2146        let components = client
2147            .get_protocol_components(ProtocolComponentsParams::new(Chain::Ethereum, ""))
2148            .await
2149            .expect("get state");
2150
2151        mocked_server.assert();
2152        assert_eq!(components.data().len(), 1);
2153        assert_eq!(components.data()[0].id, "State1");
2154        assert_eq!(components.data()[0].protocol_system, "ambient");
2155        assert_eq!(components.data()[0].protocol_type_name, "Pool");
2156        assert_eq!(components.data()[0].tokens.len(), 2);
2157        let expected_attributes =
2158            [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
2159                .iter()
2160                .cloned()
2161                .collect::<HashMap<String, Bytes>>();
2162        assert_eq!(components.data()[0].static_attributes, expected_attributes);
2163    }
2164
2165    #[tokio::test]
2166    async fn test_get_protocol_states() {
2167        let mut server = Server::new_async().await;
2168        let server_resp = r#"
2169        {
2170            "states": [
2171                {
2172                    "component_id": "State1",
2173                    "attributes": {
2174                        "attribute_1": "0x00000000000003e8"
2175                    },
2176                    "balances": {
2177                        "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
2178                    }
2179                }
2180            ],
2181            "pagination": {
2182                "page": 0,
2183                "page_size": 20,
2184                "total": 10
2185            }
2186        }
2187        "#;
2188        // test that the response is deserialized correctly
2189        serde_json::from_str::<ProtocolStateRequestResponse>(server_resp).expect("deserialize");
2190
2191        let mocked_server = server
2192            .mock("POST", "/v1/protocol_state")
2193            .expect(1)
2194            .with_body(server_resp)
2195            .create_async()
2196            .await;
2197        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2198            .expect("create client");
2199
2200        let states = client
2201            .get_protocol_states(
2202                ProtocolStatesParams::new(Chain::Ethereum, "").with_include_balances(true),
2203            )
2204            .await
2205            .expect("get state");
2206
2207        mocked_server.assert();
2208        assert_eq!(states.data().len(), 1);
2209        assert_eq!(states.data()[0].component_id, "State1");
2210        let expected_attributes =
2211            [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
2212                .iter()
2213                .cloned()
2214                .collect::<HashMap<String, Bytes>>();
2215        assert_eq!(states.data()[0].attributes, expected_attributes);
2216        let expected_balances = [(
2217            Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")
2218                .expect("Unsupported address format"),
2219            Bytes::from_str("0x01f4").unwrap(),
2220        )]
2221        .iter()
2222        .cloned()
2223        .collect::<HashMap<Bytes, Bytes>>();
2224        assert_eq!(states.data()[0].balances, expected_balances);
2225    }
2226
2227    #[tokio::test]
2228    async fn test_get_tokens() {
2229        let mut server = Server::new_async().await;
2230        let server_resp = r#"
2231        {
2232            "tokens": [
2233              {
2234                "chain": "ethereum",
2235                "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
2236                "symbol": "WETH",
2237                "decimals": 18,
2238                "tax": 0,
2239                "gas": [
2240                  29962
2241                ],
2242                "quality": 100
2243              },
2244              {
2245                "chain": "ethereum",
2246                "address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
2247                "symbol": "USDC",
2248                "decimals": 6,
2249                "tax": 0,
2250                "gas": [
2251                  40652
2252                ],
2253                "quality": 100
2254              }
2255            ],
2256            "pagination": {
2257              "page": 0,
2258              "page_size": 20,
2259              "total": 10
2260            }
2261          }
2262        "#;
2263        // test that the response is deserialized correctly
2264        serde_json::from_str::<TokensRequestResponse>(server_resp).expect("deserialize");
2265
2266        let mocked_server = server
2267            .mock("POST", "/v1/tokens")
2268            .expect(1)
2269            .with_body(server_resp)
2270            .create_async()
2271            .await;
2272        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2273            .expect("create client");
2274
2275        let tokens = client
2276            .get_tokens(TokensParams::new(Chain::Ethereum))
2277            .await
2278            .expect("get tokens");
2279
2280        let expected = vec![
2281            Token {
2282                chain: tycho_common::models::Chain::Ethereum,
2283                address: Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap(),
2284                symbol: "WETH".to_string(),
2285                decimals: 18,
2286                tax: 0,
2287                gas: vec![Some(29962)],
2288                quality: 100,
2289            },
2290            Token {
2291                chain: tycho_common::models::Chain::Ethereum,
2292                address: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
2293                symbol: "USDC".to_string(),
2294                decimals: 6,
2295                tax: 0,
2296                gas: vec![Some(40652)],
2297                quality: 100,
2298            },
2299        ];
2300
2301        mocked_server.assert();
2302        assert_eq!(*tokens.data(), expected);
2303    }
2304
2305    #[rstest]
2306    #[case::with_dci(Some(vec!["system2"]), vec!["system2"])]
2307    #[case::backward_compat(None, vec![])]
2308    #[tokio::test]
2309    async fn test_get_protocol_systems(
2310        #[case] dci_protocols: Option<Vec<&str>>,
2311        #[case] expected_dci: Vec<&str>,
2312    ) {
2313        use serde_json::json;
2314
2315        let mut json_value = json!({
2316            "protocol_systems": ["system1", "system2"],
2317            "pagination": { "page": 0, "page_size": 20, "total": 2 }
2318        });
2319        if let Some(dci) = dci_protocols {
2320            json_value["dci_protocols"] = json!(dci);
2321        }
2322        let server_resp = serde_json::to_string(&json_value).unwrap();
2323
2324        let mut server = Server::new_async().await;
2325        let mocked_server = server
2326            .mock("POST", "/v1/protocol_systems")
2327            .expect(1)
2328            .with_body(&server_resp)
2329            .create_async()
2330            .await;
2331        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2332            .expect("create client");
2333
2334        let response = client
2335            .get_protocol_systems(ProtocolSystemsParams::new(Chain::Ethereum))
2336            .await
2337            .expect("get protocol systems");
2338
2339        mocked_server.assert();
2340        assert_eq!(response.data().protocol_systems(), ["system1", "system2"]);
2341        assert_eq!(response.data().dci_protocols(), expected_dci.as_slice());
2342    }
2343
2344    #[tokio::test]
2345    async fn test_get_component_tvl() {
2346        let mut server = Server::new_async().await;
2347        let server_resp = r#"
2348        {
2349            "tvl": {
2350                "component1": 100.0
2351            },
2352            "pagination": {
2353                "page": 0,
2354                "page_size": 20,
2355                "total": 10
2356            }
2357        }
2358        "#;
2359        // test that the response is deserialized correctly
2360        serde_json::from_str::<ComponentTvlRequestResponse>(server_resp).expect("deserialize");
2361
2362        let mocked_server = server
2363            .mock("POST", "/v1/component_tvl")
2364            .expect(1)
2365            .with_body(server_resp)
2366            .create_async()
2367            .await;
2368        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2369            .expect("create client");
2370
2371        let component_tvl = client
2372            .get_component_tvl(ComponentTvlParams::new(Chain::Ethereum))
2373            .await
2374            .expect("get component tvl");
2375
2376        mocked_server.assert();
2377        assert_eq!(component_tvl.data().get("component1"), Some(&100.0));
2378    }
2379
2380    #[tokio::test]
2381    async fn test_get_traced_entry_points() {
2382        let mut server = Server::new_async().await;
2383        let server_resp = r#"
2384        {
2385            "traced_entry_points": {
2386                "component_1": [
2387                    [
2388                        {
2389                            "entry_point": {
2390                                "external_id": "entrypoint_a",
2391                                "target": "0x0000000000000000000000000000000000000001",
2392                                "signature": "sig()"
2393                            },
2394                            "params": {
2395                                "method": "rpctracer",
2396                                "caller": "0x000000000000000000000000000000000000000a",
2397                                "calldata": "0x000000000000000000000000000000000000000b"
2398                            }
2399                        },
2400                        {
2401                            "retriggers": [
2402                                [
2403                                    "0x00000000000000000000000000000000000000aa",
2404                                    {"key": "0x0000000000000000000000000000000000000aaa", "offset": 12}
2405                                ]
2406                            ],
2407                            "accessed_slots": {
2408                                "0x0000000000000000000000000000000000aaaa": [
2409                                    "0x0000000000000000000000000000000000aaaa"
2410                                ]
2411                            }
2412                        }
2413                    ]
2414                ]
2415            },
2416            "pagination": {
2417                "page": 0,
2418                "page_size": 20,
2419                "total": 1
2420            }
2421        }
2422        "#;
2423        // test that the response is deserialized correctly
2424        serde_json::from_str::<TracedEntryPointRequestResponse>(server_resp).expect("deserialize");
2425
2426        let mocked_server = server
2427            .mock("POST", "/v1/traced_entry_points")
2428            .expect(1)
2429            .with_body(server_resp)
2430            .create_async()
2431            .await;
2432        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2433            .expect("create client");
2434
2435        let entrypoints = client
2436            .get_traced_entry_points(TracedEntryPointsParams::new(Chain::Ethereum, ""))
2437            .await
2438            .expect("get traced entry points");
2439
2440        mocked_server.assert();
2441        assert_eq!(entrypoints.data().len(), 1);
2442        let comp1_entrypoints = entrypoints
2443            .data()
2444            .get("component_1")
2445            .expect("component_1 entrypoints should exist");
2446        assert_eq!(comp1_entrypoints.len(), 1);
2447
2448        let (entrypoint, trace_result) = &comp1_entrypoints[0];
2449        assert_eq!(entrypoint.entry_point.external_id, "entrypoint_a");
2450        assert_eq!(
2451            entrypoint.entry_point.target,
2452            Bytes::from_str("0x0000000000000000000000000000000000000001").unwrap()
2453        );
2454        assert_eq!(entrypoint.entry_point.signature, "sig()");
2455        let tycho_common::models::blockchain::TracingParams::RPCTracer(rpc_params) =
2456            &entrypoint.params;
2457        assert_eq!(
2458            rpc_params.caller,
2459            Some(Bytes::from("0x000000000000000000000000000000000000000a"))
2460        );
2461        assert_eq!(rpc_params.calldata, Bytes::from("0x000000000000000000000000000000000000000b"));
2462
2463        assert_eq!(
2464            trace_result.retriggers,
2465            HashSet::from([(
2466                Bytes::from("0x00000000000000000000000000000000000000aa"),
2467                AddressStorageLocation::new(
2468                    Bytes::from("0x0000000000000000000000000000000000000aaa"),
2469                    12
2470                )
2471            )])
2472        );
2473        assert_eq!(trace_result.accessed_slots.len(), 1);
2474        assert_eq!(
2475            trace_result.accessed_slots,
2476            HashMap::from([(
2477                Bytes::from("0x0000000000000000000000000000000000aaaa"),
2478                HashSet::from([Bytes::from("0x0000000000000000000000000000000000aaaa")])
2479            )])
2480        );
2481    }
2482
2483    #[tokio::test]
2484    async fn test_parse_retry_value_numeric() {
2485        let result = parse_retry_value("60");
2486        assert!(result.is_some());
2487
2488        let expected_time = SystemTime::now() + Duration::from_secs(60);
2489        let actual_time = result.unwrap();
2490
2491        // Allow for small timing differences during test execution
2492        let diff = if actual_time > expected_time {
2493            actual_time
2494                .duration_since(expected_time)
2495                .unwrap()
2496        } else {
2497            expected_time
2498                .duration_since(actual_time)
2499                .unwrap()
2500        };
2501        assert!(diff < Duration::from_secs(1), "Time difference too large: {:?}", diff);
2502    }
2503
2504    #[tokio::test]
2505    async fn test_parse_retry_value_rfc2822() {
2506        // Use a fixed future date in RFC2822 format
2507        let rfc2822_date = "Sat, 01 Jan 2030 12:00:00 +0000";
2508        let result = parse_retry_value(rfc2822_date);
2509        assert!(result.is_some());
2510
2511        let parsed_time = result.unwrap();
2512        assert!(parsed_time > SystemTime::now());
2513    }
2514
2515    #[tokio::test]
2516    async fn test_parse_retry_value_invalid_formats() {
2517        // Test various invalid formats
2518        assert!(parse_retry_value("invalid").is_none());
2519        assert!(parse_retry_value("").is_none());
2520        assert!(parse_retry_value("not_a_number").is_none());
2521        assert!(parse_retry_value("Mon, 32 Jan 2030 25:00:00 +0000").is_none());
2522        // Invalid date
2523    }
2524
2525    #[tokio::test]
2526    async fn test_parse_retry_value_zero_seconds() {
2527        let result = parse_retry_value("0");
2528        assert!(result.is_some());
2529
2530        let expected_time = SystemTime::now();
2531        let actual_time = result.unwrap();
2532
2533        // Should be very close to current time
2534        let diff = if actual_time > expected_time {
2535            actual_time
2536                .duration_since(expected_time)
2537                .unwrap()
2538        } else {
2539            expected_time
2540                .duration_since(actual_time)
2541                .unwrap()
2542        };
2543        assert!(diff < Duration::from_secs(1));
2544    }
2545
2546    #[tokio::test]
2547    async fn test_error_for_response_rate_limited() {
2548        let mut server = Server::new_async().await;
2549        let mock = server
2550            .mock("GET", "/test")
2551            .with_status(429)
2552            .with_header("Retry-After", "60")
2553            .create_async()
2554            .await;
2555
2556        let client = reqwest::Client::new();
2557        let response = client
2558            .get(format!("{}/test", server.url()))
2559            .send()
2560            .await
2561            .unwrap();
2562
2563        let http_client =
2564            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2565                .unwrap()
2566                .with_test_backoff_policy();
2567        let result = http_client
2568            .error_for_response(response)
2569            .await;
2570
2571        mock.assert();
2572        assert!(matches!(result, Err(RPCError::RateLimited(_))));
2573        if let Err(RPCError::RateLimited(retry_after)) = result {
2574            assert!(retry_after.is_some());
2575        }
2576    }
2577
2578    #[tokio::test]
2579    async fn test_error_for_response_rate_limited_no_header() {
2580        let mut server = Server::new_async().await;
2581        let mock = server
2582            .mock("GET", "/test")
2583            .with_status(429)
2584            .create_async()
2585            .await;
2586
2587        let client = reqwest::Client::new();
2588        let response = client
2589            .get(format!("{}/test", server.url()))
2590            .send()
2591            .await
2592            .unwrap();
2593
2594        let http_client =
2595            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2596                .unwrap()
2597                .with_test_backoff_policy();
2598        let result = http_client
2599            .error_for_response(response)
2600            .await;
2601
2602        mock.assert();
2603        assert!(matches!(result, Err(RPCError::RateLimited(None))));
2604    }
2605
2606    #[tokio::test]
2607    async fn test_error_for_response_server_errors() {
2608        let test_cases =
2609            vec![(502, "Bad Gateway"), (503, "Service Unavailable"), (504, "Gateway Timeout")];
2610
2611        for (status_code, expected_body) in test_cases {
2612            let mut server = Server::new_async().await;
2613            let mock = server
2614                .mock("GET", "/test")
2615                .with_status(status_code)
2616                .with_body(expected_body)
2617                .create_async()
2618                .await;
2619
2620            let client = reqwest::Client::new();
2621            let response = client
2622                .get(format!("{}/test", server.url()))
2623                .send()
2624                .await
2625                .unwrap();
2626
2627            let http_client =
2628                HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2629                    .unwrap()
2630                    .with_test_backoff_policy();
2631            let result = http_client
2632                .error_for_response(response)
2633                .await;
2634
2635            mock.assert();
2636            assert!(matches!(result, Err(RPCError::ServerUnreachable(_))));
2637            if let Err(RPCError::ServerUnreachable(body)) = result {
2638                assert_eq!(body, expected_body);
2639            }
2640        }
2641    }
2642
2643    #[tokio::test]
2644    async fn test_error_for_response_success() {
2645        let mut server = Server::new_async().await;
2646        let mock = server
2647            .mock("GET", "/test")
2648            .with_status(200)
2649            .with_body("success")
2650            .create_async()
2651            .await;
2652
2653        let client = reqwest::Client::new();
2654        let response = client
2655            .get(format!("{}/test", server.url()))
2656            .send()
2657            .await
2658            .unwrap();
2659
2660        let http_client =
2661            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2662                .unwrap()
2663                .with_test_backoff_policy();
2664        let result = http_client
2665            .error_for_response(response)
2666            .await;
2667
2668        mock.assert();
2669        assert!(result.is_ok());
2670
2671        let response = result.unwrap();
2672        assert_eq!(response.status(), 200);
2673    }
2674
2675    #[tokio::test]
2676    async fn test_handle_error_for_backoff_server_unreachable() {
2677        let http_client =
2678            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2679                .unwrap()
2680                .with_test_backoff_policy();
2681        let error = RPCError::ServerUnreachable("Service down".to_string());
2682
2683        let backoff_error = http_client
2684            .handle_error_for_backoff(error)
2685            .await;
2686
2687        match backoff_error {
2688            backoff::Error::Transient { err: RPCError::ServerUnreachable(msg), retry_after } => {
2689                assert_eq!(msg, "Service down");
2690                assert_eq!(retry_after, Some(Duration::from_millis(50))); // Fast test duration
2691            }
2692            _ => panic!("Expected transient error for ServerUnreachable"),
2693        }
2694    }
2695
2696    #[tokio::test]
2697    async fn test_handle_error_for_backoff_rate_limited_with_retry_after() {
2698        let http_client =
2699            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2700                .unwrap()
2701                .with_test_backoff_policy();
2702        let future_time = SystemTime::now() + Duration::from_secs(30);
2703        let error = RPCError::RateLimited(Some(future_time));
2704
2705        let backoff_error = http_client
2706            .handle_error_for_backoff(error)
2707            .await;
2708
2709        match backoff_error {
2710            backoff::Error::Transient { err: RPCError::RateLimited(retry_after), .. } => {
2711                assert_eq!(retry_after, Some(future_time));
2712            }
2713            _ => panic!("Expected transient error for RateLimited"),
2714        }
2715
2716        // Verify that retry_after was stored in the client state
2717        let stored_retry_after = http_client.retry_after.read().await;
2718        assert_eq!(*stored_retry_after, Some(future_time));
2719    }
2720
2721    #[tokio::test]
2722    async fn test_handle_error_for_backoff_rate_limited_no_retry_after() {
2723        let http_client =
2724            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2725                .unwrap()
2726                .with_test_backoff_policy();
2727        let error = RPCError::RateLimited(None);
2728
2729        let backoff_error = http_client
2730            .handle_error_for_backoff(error)
2731            .await;
2732
2733        match backoff_error {
2734            backoff::Error::Transient { err: RPCError::RateLimited(None), .. } => {
2735                // This is expected - no retry-after still allows retries with default policy
2736            }
2737            _ => panic!("Expected transient error for RateLimited without retry-after"),
2738        }
2739    }
2740
2741    #[tokio::test]
2742    async fn test_handle_error_for_backoff_other_errors() {
2743        let http_client =
2744            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2745                .unwrap()
2746                .with_test_backoff_policy();
2747        let error = RPCError::ParseResponse("Invalid JSON".to_string());
2748
2749        let backoff_error = http_client
2750            .handle_error_for_backoff(error)
2751            .await;
2752
2753        match backoff_error {
2754            backoff::Error::Permanent(RPCError::ParseResponse(msg)) => {
2755                assert_eq!(msg, "Invalid JSON");
2756            }
2757            _ => panic!("Expected permanent error for ParseResponse"),
2758        }
2759    }
2760
2761    #[tokio::test]
2762    async fn test_wait_until_retry_after_no_retry_time() {
2763        let http_client =
2764            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2765                .unwrap()
2766                .with_test_backoff_policy();
2767
2768        let start = std::time::Instant::now();
2769        http_client
2770            .wait_until_retry_after()
2771            .await;
2772        let elapsed = start.elapsed();
2773
2774        // Should return immediately if no retry time is set
2775        assert!(elapsed < Duration::from_millis(100));
2776    }
2777
2778    #[tokio::test]
2779    async fn test_wait_until_retry_after_past_time() {
2780        let http_client =
2781            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2782                .unwrap()
2783                .with_test_backoff_policy();
2784
2785        // Set a retry time in the past
2786        let past_time = SystemTime::now() - Duration::from_secs(10);
2787        *http_client.retry_after.write().await = Some(past_time);
2788
2789        let start = std::time::Instant::now();
2790        http_client
2791            .wait_until_retry_after()
2792            .await;
2793        let elapsed = start.elapsed();
2794
2795        // Should return immediately if retry time is in the past
2796        assert!(elapsed < Duration::from_millis(100));
2797    }
2798
2799    #[tokio::test]
2800    async fn test_wait_until_retry_after_future_time() {
2801        let http_client =
2802            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2803                .unwrap()
2804                .with_test_backoff_policy();
2805
2806        // Set a retry time 100ms in the future
2807        let future_time = SystemTime::now() + Duration::from_millis(100);
2808        *http_client.retry_after.write().await = Some(future_time);
2809
2810        let start = std::time::Instant::now();
2811        http_client
2812            .wait_until_retry_after()
2813            .await;
2814        let elapsed = start.elapsed();
2815
2816        // Should wait approximately the specified duration
2817        assert!(elapsed >= Duration::from_millis(80)); // Allow some tolerance
2818        assert!(elapsed <= Duration::from_millis(200)); // Upper bound for test stability
2819    }
2820
2821    #[tokio::test]
2822    async fn test_make_post_request_success() {
2823        let mut server = Server::new_async().await;
2824        let server_resp = r#"{"success": true}"#;
2825
2826        let mock = server
2827            .mock("POST", "/test")
2828            .with_status(200)
2829            .with_body(server_resp)
2830            .create_async()
2831            .await;
2832
2833        let http_client =
2834            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2835                .unwrap()
2836                .with_test_backoff_policy();
2837        let request_body = serde_json::json!({"test": "data"});
2838        let uri = format!("{}/test", server.url());
2839
2840        let result = http_client
2841            .make_post_request(&request_body, &uri)
2842            .await;
2843
2844        mock.assert();
2845        assert!(result.is_ok());
2846
2847        let response = result.unwrap();
2848        assert_eq!(response.status(), 200);
2849        assert_eq!(response.text().await.unwrap(), server_resp);
2850    }
2851
2852    #[tokio::test]
2853    async fn test_make_post_request_retry_on_server_error() {
2854        let mut server = Server::new_async().await;
2855        // First request fails with 503, second succeeds
2856        let error_mock = server
2857            .mock("POST", "/test")
2858            .with_status(503)
2859            .with_body("Service Unavailable")
2860            .expect(1)
2861            .create_async()
2862            .await;
2863
2864        let success_mock = server
2865            .mock("POST", "/test")
2866            .with_status(200)
2867            .with_body(r#"{"success": true}"#)
2868            .expect(1)
2869            .create_async()
2870            .await;
2871
2872        let http_client =
2873            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2874                .unwrap()
2875                .with_test_backoff_policy();
2876        let request_body = serde_json::json!({"test": "data"});
2877        let uri = format!("{}/test", server.url());
2878
2879        let result = http_client
2880            .make_post_request(&request_body, &uri)
2881            .await;
2882
2883        error_mock.assert();
2884        success_mock.assert();
2885        assert!(result.is_ok());
2886    }
2887
2888    #[tokio::test]
2889    async fn test_make_post_request_respect_retry_after_header() {
2890        let mut server = Server::new_async().await;
2891
2892        // First request returns 429 with retry-after, second succeeds
2893        let rate_limit_mock = server
2894            .mock("POST", "/test")
2895            .with_status(429)
2896            .with_header("Retry-After", "1") // 1 second
2897            .expect(1)
2898            .create_async()
2899            .await;
2900
2901        let success_mock = server
2902            .mock("POST", "/test")
2903            .with_status(200)
2904            .with_body(r#"{"success": true}"#)
2905            .expect(1)
2906            .create_async()
2907            .await;
2908
2909        let http_client =
2910            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2911                .unwrap()
2912                .with_test_backoff_policy();
2913        let request_body = serde_json::json!({"test": "data"});
2914        let uri = format!("{}/test", server.url());
2915
2916        let start = std::time::Instant::now();
2917        let result = http_client
2918            .make_post_request(&request_body, &uri)
2919            .await;
2920        let elapsed = start.elapsed();
2921
2922        rate_limit_mock.assert();
2923        success_mock.assert();
2924        assert!(result.is_ok());
2925
2926        // Should have waited at least 1 second due to retry-after header
2927        assert!(elapsed >= Duration::from_millis(900)); // Allow some tolerance
2928        assert!(elapsed <= Duration::from_millis(2000)); // Upper bound for test stability
2929    }
2930
2931    #[tokio::test]
2932    async fn test_make_post_request_permanent_error() {
2933        let mut server = Server::new_async().await;
2934
2935        let mock = server
2936            .mock("POST", "/test")
2937            .with_status(400) // Bad Request - should not be retried
2938            .with_body("Bad Request")
2939            .expect(1)
2940            .create_async()
2941            .await;
2942
2943        let http_client =
2944            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2945                .unwrap()
2946                .with_test_backoff_policy();
2947        let request_body = serde_json::json!({"test": "data"});
2948        let uri = format!("{}/test", server.url());
2949
2950        let result = http_client
2951            .make_post_request(&request_body, &uri)
2952            .await;
2953
2954        mock.assert();
2955        assert!(result.is_ok()); // 400 doesn't trigger retry logic, just returns the response
2956
2957        let response = result.unwrap();
2958        assert_eq!(response.status(), 400);
2959    }
2960
2961    #[tokio::test]
2962    async fn test_concurrent_requests_with_different_retry_after() {
2963        let mut server = Server::new_async().await;
2964
2965        // First request gets rate limited with 1 second retry-after
2966        let rate_limit_mock_1 = server
2967            .mock("POST", "/test1")
2968            .with_status(429)
2969            .with_header("Retry-After", "1")
2970            .expect(1)
2971            .create_async()
2972            .await;
2973
2974        // Second request gets rate limited with 2 second retry-after
2975        let rate_limit_mock_2 = server
2976            .mock("POST", "/test2")
2977            .with_status(429)
2978            .with_header("Retry-After", "2")
2979            .expect(1)
2980            .create_async()
2981            .await;
2982
2983        // Success mocks for retries
2984        let success_mock_1 = server
2985            .mock("POST", "/test1")
2986            .with_status(200)
2987            .with_body(r#"{"result": "success1"}"#)
2988            .expect(1)
2989            .create_async()
2990            .await;
2991
2992        let success_mock_2 = server
2993            .mock("POST", "/test2")
2994            .with_status(200)
2995            .with_body(r#"{"result": "success2"}"#)
2996            .expect(1)
2997            .create_async()
2998            .await;
2999
3000        let http_client =
3001            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3002                .unwrap()
3003                .with_test_backoff_policy();
3004        let request_body = serde_json::json!({"test": "data"});
3005
3006        let uri1 = format!("{}/test1", server.url());
3007        let uri2 = format!("{}/test2", server.url());
3008
3009        // Start both requests concurrently
3010        let start = std::time::Instant::now();
3011        let (result1, result2) = tokio::join!(
3012            http_client.make_post_request(&request_body, &uri1),
3013            http_client.make_post_request(&request_body, &uri2)
3014        );
3015        let elapsed = start.elapsed();
3016
3017        rate_limit_mock_1.assert();
3018        rate_limit_mock_2.assert();
3019        success_mock_1.assert();
3020        success_mock_2.assert();
3021
3022        assert!(result1.is_ok());
3023        assert!(result2.is_ok());
3024
3025        // Both requests should succeed, but the second should take longer due to the 2s retry-after
3026        // The total time should be at least 2 seconds since the shared retry_after state
3027        // gets updated by both requests
3028        assert!(elapsed >= Duration::from_millis(1800)); // Allow some tolerance
3029        assert!(elapsed <= Duration::from_millis(3000)); // Upper bound
3030
3031        // Check the final retry_after state - should be the latest (higher) value
3032        let final_retry_after = http_client.retry_after.read().await;
3033        assert!(final_retry_after.is_some());
3034
3035        // The retry_after should be set to the latest (higher) value from the two requests
3036        if let Some(retry_time) = *final_retry_after {
3037            // The retry_after time might be in the past now since we waited,
3038            // but it should be reasonable (not too far in past/future)
3039            let now = SystemTime::now();
3040            let diff = if retry_time > now {
3041                retry_time.duration_since(now).unwrap()
3042            } else {
3043                now.duration_since(retry_time).unwrap()
3044            };
3045
3046            // Should be within a reasonable range (the 2s retry-after plus some buffer)
3047            assert!(diff <= Duration::from_secs(3), "Retry time difference too large: {:?}", diff);
3048        }
3049    }
3050
3051    #[tokio::test]
3052    async fn test_get_snapshots() {
3053        let mut server = Server::new_async().await;
3054
3055        // Mock protocol states response
3056        let protocol_states_resp = r#"
3057        {
3058            "states": [
3059                {
3060                    "component_id": "component1",
3061                    "attributes": {
3062                        "attribute_1": "0x00000000000003e8"
3063                    },
3064                    "balances": {
3065                        "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
3066                    }
3067                }
3068            ],
3069            "pagination": {
3070                "page": 0,
3071                "page_size": 100,
3072                "total": 1
3073            }
3074        }
3075        "#;
3076
3077        // Mock contract state response
3078        let contract_state_resp = r#"
3079        {
3080            "accounts": [
3081                {
3082                    "chain": "ethereum",
3083                    "address": "0x1111111111111111111111111111111111111111",
3084                    "title": "",
3085                    "slots": {},
3086                    "native_balance": "0x01f4",
3087                    "token_balances": {},
3088                    "code": "0x00",
3089                    "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
3090                    "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
3091                    "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
3092                    "creation_tx": null
3093                }
3094            ],
3095            "pagination": {
3096                "page": 0,
3097                "page_size": 100,
3098                "total": 1
3099            }
3100        }
3101        "#;
3102
3103        // Mock component TVL response
3104        let tvl_resp = r#"
3105        {
3106            "tvl": {
3107                "component1": 1000000.0
3108            },
3109            "pagination": {
3110                "page": 0,
3111                "page_size": 100,
3112                "total": 1
3113            }
3114        }
3115        "#;
3116
3117        let protocol_states_mock = server
3118            .mock("POST", "/v1/protocol_state")
3119            .expect(1)
3120            .with_body(protocol_states_resp)
3121            .create_async()
3122            .await;
3123
3124        let contract_state_mock = server
3125            .mock("POST", "/v1/contract_state")
3126            .expect(1)
3127            .with_body(contract_state_resp)
3128            .create_async()
3129            .await;
3130
3131        let tvl_mock = server
3132            .mock("POST", "/v1/component_tvl")
3133            .expect(1)
3134            .with_body(tvl_resp)
3135            .create_async()
3136            .await;
3137
3138        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3139            .expect("create client");
3140
3141        let component = tycho_common::models::protocol::ProtocolComponent {
3142            id: "component1".to_string(),
3143            protocol_system: "test_protocol".to_string(),
3144            protocol_type_name: "test_type".to_string(),
3145            chain: Chain::Ethereum,
3146            tokens: vec![],
3147            contract_addresses: vec![
3148                Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()
3149            ],
3150            static_attributes: HashMap::new(),
3151            change: tycho_common::models::ChangeType::Creation,
3152            creation_tx: Bytes::from_str(
3153                "0x0000000000000000000000000000000000000000000000000000000000000000",
3154            )
3155            .unwrap(),
3156            created_at: chrono::Utc::now().naive_utc(),
3157        };
3158
3159        let mut components = HashMap::new();
3160        components.insert("component1".to_string(), component);
3161
3162        let contract_ids =
3163            vec![Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()];
3164
3165        let request = SnapshotParameters::new(
3166            Chain::Ethereum,
3167            "test_protocol",
3168            &components,
3169            &contract_ids,
3170            12345,
3171        );
3172
3173        let response = client
3174            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3175            .await
3176            .expect("get snapshots");
3177
3178        // Verify all mocks were called
3179        protocol_states_mock.assert();
3180        contract_state_mock.assert();
3181        tvl_mock.assert();
3182
3183        // Assert states
3184        assert_eq!(response.states.len(), 1);
3185        assert!(response
3186            .states
3187            .contains_key("component1"));
3188
3189        // Check that the state has the expected TVL
3190        let component_state = response
3191            .states
3192            .get("component1")
3193            .unwrap();
3194        assert_eq!(component_state.component_tvl, Some(1000000.0));
3195
3196        // Assert VM storage
3197        assert_eq!(response.vm_storage.len(), 1);
3198        let contract_addr = Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap();
3199        assert!(response
3200            .vm_storage
3201            .contains_key(&contract_addr));
3202    }
3203
3204    #[tokio::test]
3205    async fn test_get_snapshots_empty_components() {
3206        let server = Server::new_async().await;
3207        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3208            .expect("create client");
3209
3210        let components = HashMap::new();
3211        let contract_ids = vec![];
3212
3213        let request = SnapshotParameters::new(
3214            Chain::Ethereum,
3215            "test_protocol",
3216            &components,
3217            &contract_ids,
3218            12345,
3219        );
3220
3221        let response = client
3222            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3223            .await
3224            .expect("get snapshots");
3225
3226        // Should return empty response without making any requests
3227        assert!(response.states.is_empty());
3228        assert!(response.vm_storage.is_empty());
3229    }
3230
3231    #[tokio::test]
3232    async fn test_get_snapshots_without_tvl() {
3233        let mut server = Server::new_async().await;
3234
3235        let protocol_states_resp = r#"
3236        {
3237            "states": [
3238                {
3239                    "component_id": "component1",
3240                    "attributes": {},
3241                    "balances": {}
3242                }
3243            ],
3244            "pagination": {
3245                "page": 0,
3246                "page_size": 100,
3247                "total": 1
3248            }
3249        }
3250        "#;
3251
3252        let protocol_states_mock = server
3253            .mock("POST", "/v1/protocol_state")
3254            .expect(1)
3255            .with_body(protocol_states_resp)
3256            .create_async()
3257            .await;
3258
3259        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3260            .expect("create client");
3261
3262        // Create test component
3263        let component = tycho_common::models::protocol::ProtocolComponent {
3264            id: "component1".to_string(),
3265            protocol_system: "test_protocol".to_string(),
3266            protocol_type_name: "test_type".to_string(),
3267            chain: Chain::Ethereum,
3268            tokens: vec![],
3269            contract_addresses: vec![],
3270            static_attributes: HashMap::new(),
3271            change: tycho_common::models::ChangeType::Creation,
3272            creation_tx: Bytes::from_str(
3273                "0x0000000000000000000000000000000000000000000000000000000000000000",
3274            )
3275            .unwrap(),
3276            created_at: chrono::Utc::now().naive_utc(),
3277        };
3278
3279        let mut components = HashMap::new();
3280        components.insert("component1".to_string(), component);
3281        let contract_ids = vec![];
3282
3283        let request = SnapshotParameters::new(
3284            Chain::Ethereum,
3285            "test_protocol",
3286            &components,
3287            &contract_ids,
3288            12345,
3289        )
3290        .include_balances(false)
3291        .include_tvl(false);
3292
3293        let response = client
3294            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3295            .await
3296            .expect("get snapshots");
3297
3298        // Verify only necessary mocks were called
3299        protocol_states_mock.assert();
3300        // No contract_state_mock.assert() since contract_ids is empty
3301        // No tvl_mock.assert() since include_tvl is false
3302
3303        assert_eq!(response.states.len(), 1);
3304        // Check that TVL is None since we didn't request it
3305        let component_state = response
3306            .states
3307            .get("component1")
3308            .unwrap();
3309        assert_eq!(component_state.component_tvl, None);
3310    }
3311
3312    #[tokio::test]
3313    async fn test_compression_enabled() {
3314        let mut server = Server::new_async().await;
3315        let server_resp = GET_CONTRACT_STATE_RESP;
3316
3317        // Compress the response using zstd
3318        let compressed_body =
3319            zstd::encode_all(server_resp.as_bytes(), 0).expect("compression failed");
3320
3321        let mocked_server = server
3322            .mock("POST", "/v1/contract_state")
3323            .expect(1)
3324            .with_header("Content-Encoding", "zstd")
3325            .with_body(compressed_body)
3326            .create_async()
3327            .await;
3328
3329        // Create client with compression enabled
3330        let client = HttpRPCClient::new(
3331            server.url().as_str(),
3332            HttpRPCClientOptions::new().with_compression(true),
3333        )
3334        .expect("create client");
3335
3336        let response = client
3337            .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
3338            .await
3339            .expect("get state");
3340        let accounts = response;
3341
3342        mocked_server.assert();
3343        assert_eq!(accounts.data().len(), 1);
3344        assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
3345    }
3346
3347    #[tokio::test]
3348    async fn test_compression_disabled() {
3349        let mut server = Server::new_async().await;
3350        let server_resp = GET_CONTRACT_STATE_RESP;
3351
3352        // Verify client does NOT send Accept-Encoding: zstd when compression is disabled
3353        // Instead, server should receive request without compression headers
3354        let mocked_server = server
3355            .mock("POST", "/v1/contract_state")
3356            .expect(1)
3357            .match_header("Accept-Encoding", mockito::Matcher::Missing)
3358            .with_status(200)
3359            .with_body(server_resp)
3360            .create_async()
3361            .await;
3362
3363        // Create client with compression disabled
3364        let client = HttpRPCClient::new(
3365            server.url().as_str(),
3366            HttpRPCClientOptions::new().with_compression(false),
3367        )
3368        .expect("create client");
3369
3370        let response = client
3371            .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
3372            .await
3373            .expect("get state");
3374        let accounts = response;
3375
3376        // Verify the mock was called (client sent request without Accept-Encoding header)
3377        mocked_server.assert();
3378        assert_eq!(accounts.data().len(), 1);
3379        assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
3380    }
3381
3382    #[rstest]
3383    #[case::single_page(2, 1000)]
3384    #[case::multiple_pages_within_concurrency(10, 2)]
3385    #[case::exceeds_concurrency_limit(60, 2)]
3386    #[tokio::test]
3387    async fn test_get_all_tokens_pagination_and_concurrency(
3388        #[case] total_tokens: usize,
3389        #[case] page_size: usize,
3390    ) {
3391        use std::sync::atomic::{AtomicUsize, Ordering};
3392
3393        let allowed_concurrency = 10;
3394
3395        let concurrent_requests = Arc::new(AtomicUsize::new(0));
3396        let max_concurrent = Arc::new(AtomicUsize::new(0));
3397
3398        let mut server = Server::new_async().await;
3399
3400        let total_pages = (total_tokens as f64 / page_size as f64).ceil() as i64;
3401
3402        // Mock all required pages
3403        for page in 0..total_pages {
3404            let concurrent = concurrent_requests.clone();
3405            let max_conc = max_concurrent.clone();
3406
3407            let tokens_in_page = {
3408                let start_idx = (page as usize) * page_size;
3409                let end_idx = ((page as usize + 1) * page_size).min(total_tokens);
3410                (start_idx..end_idx)
3411                    .map(|i| {
3412                        format!(
3413                            r#"{{
3414                            "chain": "ethereum",
3415                            "address": "0x{i:040x}",
3416                            "symbol": "TOKEN_{i}",
3417                            "decimals": 18,
3418                            "tax": 0,
3419                            "gas": [30000],
3420                            "quality": 100
3421                        }}"#
3422                        )
3423                    })
3424                    .collect::<Vec<_>>()
3425            };
3426
3427            let tokens_json = tokens_in_page.join(",");
3428            let response = format!(
3429                r#"{{
3430                    "tokens": [{tokens_json}],
3431                    "pagination": {{
3432                        "page": {page},
3433                        "page_size": {page_size},
3434                        "total": {total_tokens}
3435                    }}
3436                }}"#,
3437            );
3438
3439            server
3440                .mock("POST", "/v1/tokens")
3441                .expect(1)
3442                .with_chunked_body(move |w| {
3443                    // Track concurrent requests
3444                    let current = concurrent.fetch_add(1, Ordering::SeqCst);
3445                    max_conc.fetch_max(current + 1, Ordering::SeqCst);
3446
3447                    // Simulate some work to increase likelihood of concurrent requests
3448                    std::thread::sleep(Duration::from_millis(10));
3449
3450                    concurrent.fetch_sub(1, Ordering::SeqCst);
3451
3452                    w.write_all(response.as_bytes())
3453                })
3454                .create_async()
3455                .await;
3456        }
3457
3458        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3459            .expect("create client");
3460
3461        let tokens = client
3462            .get_all_tokens(
3463                AllTokensParams::new(Chain::Ethereum, allowed_concurrency)
3464                    .with_chunk_size(page_size),
3465            )
3466            .await
3467            .expect("get all tokens");
3468
3469        // Verify concurrency was respected
3470        let max = max_concurrent.load(Ordering::SeqCst);
3471        let expected_max_concurrency = (total_pages as usize)
3472            .saturating_sub(1)
3473            .min(allowed_concurrency);
3474        assert!(
3475            max <= allowed_concurrency,
3476            "Expected max concurrent requests <= {allowed_concurrency}, got {max}"
3477        );
3478
3479        // For cases with multiple pages, verify we actually used concurrency
3480        if total_pages > 1 && expected_max_concurrency > 1 {
3481            assert!(
3482                max > 0,
3483                "Expected some concurrent requests for multi-page response, got {max}"
3484            );
3485        }
3486
3487        // Verify we got all expected tokens
3488        assert_eq!(
3489            tokens.len(),
3490            total_tokens,
3491            "Expected {total_tokens} tokens, got {}",
3492            tokens.len()
3493        );
3494
3495        // Verify tokens are in the expected order
3496        for (i, token) in tokens.iter().enumerate() {
3497            assert_eq!(token.symbol, format!("TOKEN_{i}"), "Token at index {i} has wrong symbol");
3498        }
3499    }
3500}