Skip to main content

tycho_client/
rpc.rs

1//! # Tycho RPC Client
2//!
3//! The objective of this module is to provide swift and simplified access to the Remote Procedure
4//! Call (RPC) endpoints of Tycho. These endpoints are chiefly responsible for facilitating data
5//! queries, especially querying snapshots of data.
6use std::{
7    collections::HashMap,
8    sync::Arc,
9    time::{Duration, SystemTime},
10};
11
12use async_trait::async_trait;
13use backoff::{exponential::ExponentialBackoffBuilder, ExponentialBackoff};
14use futures03::future::try_join_all;
15#[cfg(test)]
16use mockall::automock;
17use reqwest::{header, Client, ClientBuilder, Response, StatusCode, Url};
18use serde::Serialize;
19use thiserror::Error;
20use time::{format_description::well_known::Rfc2822, OffsetDateTime};
21use tokio::{
22    sync::{RwLock, Semaphore},
23    time::sleep,
24};
25use tracing::{debug, error, instrument, trace, warn};
26use tycho_common::{
27    dto::{
28        ComponentTvlRequestBody, ComponentTvlRequestResponse, PaginationLimits, PaginationParams,
29        ProtocolComponentRequestResponse, ProtocolComponentsRequestBody, ProtocolStateRequestBody,
30        ProtocolStateRequestResponse, ProtocolSystemsRequestBody, ProtocolSystemsRequestResponse,
31        StateRequestBody, StateRequestResponse, TokensRequestBody, TokensRequestResponse,
32        TracedEntryPointRequestBody, TracedEntryPointRequestResponse, VersionParam,
33    },
34    models::{
35        blockchain::{EntryPointWithTracingParams, TracedEntryPoints, TracingResult},
36        contract::Account,
37        protocol::{ProtocolComponent, ProtocolComponentState},
38        token::Token,
39        Chain, ComponentId,
40    },
41    Bytes,
42};
43
44/// Data payload returned by `RPCClient::get_protocol_systems`.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct ProtocolSystems {
47    protocol_systems: Vec<String>,
48    dci_protocols: Vec<String>,
49}
50
51impl ProtocolSystems {
52    pub(crate) fn new(protocol_systems: Vec<String>, dci_protocols: Vec<String>) -> Self {
53        Self { protocol_systems, dci_protocols }
54    }
55
56    pub fn protocol_systems(&self) -> &[String] {
57        &self.protocol_systems
58    }
59
60    pub fn dci_protocols(&self) -> &[String] {
61        &self.dci_protocols
62    }
63}
64
65/// An RPC response page, bundling data with the pagination metadata the server returned.
66#[derive(Debug, Clone, PartialEq)]
67pub struct Page<T> {
68    data: T,
69    total: i64,
70    page: i64,
71    page_size: i64,
72}
73
74impl<T> Page<T> {
75    pub fn new(data: T, total: i64, page: i64, page_size: i64) -> Self {
76        Page { data, total, page, page_size }
77    }
78
79    pub fn data(&self) -> &T {
80        &self.data
81    }
82
83    pub fn into_data(self) -> T {
84        self.data
85    }
86
87    pub fn total(&self) -> i64 {
88        self.total
89    }
90
91    pub fn page(&self) -> i64 {
92        self.page
93    }
94
95    pub fn page_size(&self) -> i64 {
96        self.page_size
97    }
98}
99
100impl<T> Page<Vec<T>> {
101    pub fn len(&self) -> usize {
102        self.data.len()
103    }
104
105    pub fn is_empty(&self) -> bool {
106        self.data.is_empty()
107    }
108}
109
110impl<K, V, S: std::hash::BuildHasher> Page<HashMap<K, V, S>> {
111    pub fn len(&self) -> usize {
112        self.data.len()
113    }
114
115    pub fn is_empty(&self) -> bool {
116        self.data.is_empty()
117    }
118}
119
120impl<T: IntoIterator> IntoIterator for Page<T> {
121    type Item = T::Item;
122    type IntoIter = T::IntoIter;
123
124    fn into_iter(self) -> Self::IntoIter {
125        self.data.into_iter()
126    }
127}
128
129impl<'a, T> IntoIterator for &'a Page<T>
130where
131    &'a T: IntoIterator,
132{
133    type Item = <&'a T as IntoIterator>::Item;
134    type IntoIter = <&'a T as IntoIterator>::IntoIter;
135
136    fn into_iter(self) -> Self::IntoIter {
137        (&self.data).into_iter()
138    }
139}
140
141use crate::{
142    feed::synchronizer::{ComponentWithState, Snapshot},
143    TYCHO_SERVER_VERSION,
144};
145
146/// Suggested concurrency level for RPC clients.
147pub const RPC_CLIENT_CONCURRENCY: usize = 4;
148
149/// Parameters for [`RPCClient::get_contract_state`].
150#[derive(Clone, PartialEq, Debug)]
151pub struct ContractStateParams {
152    chain: Chain,
153    protocol_system: String,
154    contract_ids: Option<Vec<Bytes>>,
155    version: VersionParam,
156    page: i64,
157    page_size: i64,
158}
159
160impl ContractStateParams {
161    pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
162        Self {
163            chain,
164            protocol_system: protocol_system.into(),
165            contract_ids: None,
166            version: VersionParam::default(),
167            page: 0,
168            page_size: StateRequestBody::MAX_PAGE_SIZE_COMPRESSED,
169        }
170    }
171
172    pub fn with_contract_ids(mut self, ids: Vec<Bytes>) -> Self {
173        self.contract_ids = Some(ids);
174        self
175    }
176
177    pub fn with_version(mut self, version: VersionParam) -> Self {
178        self.version = version;
179        self
180    }
181
182    pub fn with_block_number(mut self, block_number: u64) -> Self {
183        self.version = VersionParam::at_block(self.chain.into(), block_number);
184        self
185    }
186
187    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
188        self.page = page;
189        self.page_size = page_size;
190        self
191    }
192}
193
194/// Parameters for [`RPCClient::get_protocol_components`].
195#[derive(Clone, PartialEq, Debug)]
196pub struct ProtocolComponentsParams {
197    chain: Chain,
198    protocol_system: String,
199    component_ids: Option<Vec<ComponentId>>,
200    tvl_gt: Option<f64>,
201    page: i64,
202    page_size: i64,
203}
204
205impl ProtocolComponentsParams {
206    pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
207        Self {
208            chain,
209            protocol_system: protocol_system.into(),
210            component_ids: None,
211            tvl_gt: None,
212            page: 0,
213            page_size: ProtocolComponentsRequestBody::MAX_PAGE_SIZE_COMPRESSED,
214        }
215    }
216
217    pub fn with_component_ids(mut self, ids: Vec<ComponentId>) -> Self {
218        self.component_ids = Some(ids);
219        self
220    }
221
222    pub fn with_tvl_gt(mut self, tvl_gt: f64) -> Self {
223        self.tvl_gt = Some(tvl_gt);
224        self
225    }
226
227    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
228        self.page = page;
229        self.page_size = page_size;
230        self
231    }
232
233    #[cfg(test)]
234    pub(crate) fn component_ids(&self) -> Option<&Vec<ComponentId>> {
235        self.component_ids.as_ref()
236    }
237}
238
239/// Parameters for [`RPCClient::get_protocol_states`].
240#[derive(Clone, PartialEq, Debug)]
241pub struct ProtocolStatesParams {
242    chain: Chain,
243    protocol_system: String,
244    protocol_ids: Option<Vec<String>>,
245    include_balances: bool,
246    version: VersionParam,
247    page: i64,
248    page_size: i64,
249}
250
251impl ProtocolStatesParams {
252    pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
253        Self {
254            chain,
255            protocol_system: protocol_system.into(),
256            protocol_ids: None,
257            include_balances: false,
258            version: VersionParam::default(),
259            page: 0,
260            page_size: ProtocolStateRequestBody::MAX_PAGE_SIZE_COMPRESSED,
261        }
262    }
263
264    pub fn with_protocol_ids(mut self, ids: Vec<String>) -> Self {
265        self.protocol_ids = Some(ids);
266        self
267    }
268
269    pub fn with_include_balances(mut self, include_balances: bool) -> Self {
270        self.include_balances = include_balances;
271        self
272    }
273
274    pub fn with_version(mut self, version: VersionParam) -> Self {
275        self.version = version;
276        self
277    }
278
279    pub fn with_block_number(mut self, block_number: u64) -> Self {
280        self.version = VersionParam::at_block(self.chain.into(), block_number);
281        self
282    }
283
284    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
285        self.page = page;
286        self.page_size = page_size;
287        self
288    }
289}
290
291/// Parameters for [`RPCClient::get_tokens`].
292#[derive(Clone, PartialEq, Debug)]
293pub struct TokensParams {
294    chain: Chain,
295    min_quality: Option<i32>,
296    traded_n_days_ago: Option<u64>,
297    page: i64,
298    page_size: i64,
299}
300
301impl TokensParams {
302    pub fn new(chain: Chain) -> Self {
303        Self {
304            chain,
305            min_quality: None,
306            traded_n_days_ago: None,
307            page: 0,
308            page_size: TokensRequestBody::MAX_PAGE_SIZE_COMPRESSED,
309        }
310    }
311
312    pub fn with_min_quality(mut self, min_quality: i32) -> Self {
313        self.min_quality = Some(min_quality);
314        self
315    }
316
317    pub fn with_traded_n_days_ago(mut self, days: u64) -> Self {
318        self.traded_n_days_ago = Some(days);
319        self
320    }
321
322    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
323        self.page = page;
324        self.page_size = page_size;
325        self
326    }
327}
328
329/// Parameters for [`RPCClient::get_protocol_systems`].
330#[derive(Clone, PartialEq, Debug)]
331pub struct ProtocolSystemsParams {
332    chain: Chain,
333    page: i64,
334    page_size: i64,
335}
336
337impl ProtocolSystemsParams {
338    pub fn new(chain: Chain) -> Self {
339        Self { chain, page: 0, page_size: ProtocolSystemsRequestBody::MAX_PAGE_SIZE_COMPRESSED }
340    }
341
342    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
343        self.page = page;
344        self.page_size = page_size;
345        self
346    }
347}
348
349/// Parameters for [`RPCClient::get_component_tvl`].
350#[derive(Clone, PartialEq, Debug)]
351pub struct ComponentTvlParams {
352    chain: Chain,
353    protocol_system: Option<String>,
354    component_ids: Option<Vec<String>>,
355    page: i64,
356    page_size: i64,
357}
358
359impl ComponentTvlParams {
360    pub fn new(chain: Chain) -> Self {
361        Self {
362            chain,
363            protocol_system: None,
364            component_ids: None,
365            page: 0,
366            page_size: ComponentTvlRequestBody::MAX_PAGE_SIZE_COMPRESSED,
367        }
368    }
369
370    pub fn with_protocol_system(mut self, protocol_system: impl Into<String>) -> Self {
371        self.protocol_system = Some(protocol_system.into());
372        self
373    }
374
375    pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
376        self.component_ids = Some(ids);
377        self
378    }
379
380    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
381        self.page = page;
382        self.page_size = page_size;
383        self
384    }
385}
386
387/// Parameters for [`RPCClient::get_traced_entry_points`].
388#[derive(Clone, PartialEq, Debug)]
389pub struct TracedEntryPointsParams {
390    chain: Chain,
391    protocol_system: String,
392    component_ids: Option<Vec<String>>,
393    page: i64,
394    page_size: i64,
395}
396
397impl TracedEntryPointsParams {
398    pub fn new(chain: Chain, protocol_system: impl Into<String>) -> Self {
399        Self {
400            chain,
401            protocol_system: protocol_system.into(),
402            component_ids: None,
403            page: 0,
404            page_size: TracedEntryPointRequestBody::MAX_PAGE_SIZE_COMPRESSED,
405        }
406    }
407
408    pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
409        self.component_ids = Some(ids);
410        self
411    }
412
413    pub(crate) fn with_pagination(mut self, page: i64, page_size: i64) -> Self {
414        self.page = page;
415        self.page_size = page_size;
416        self
417    }
418}
419
420/// Parameters for [`RPCClient::get_protocol_components_paginated`].
421#[derive(Clone, PartialEq, Debug)]
422pub struct ProtocolComponentsPaginatedParams {
423    chain: Chain,
424    protocol_system: String,
425    component_ids: Option<Vec<ComponentId>>,
426    tvl_gt: Option<f64>,
427    chunk_size: Option<usize>,
428    concurrency: usize,
429}
430
431impl ProtocolComponentsPaginatedParams {
432    pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
433        Self {
434            chain,
435            protocol_system: protocol_system.into(),
436            component_ids: None,
437            tvl_gt: None,
438            chunk_size: None,
439            concurrency,
440        }
441    }
442
443    pub fn with_component_ids(mut self, ids: Vec<ComponentId>) -> Self {
444        self.component_ids = Some(ids);
445        self
446    }
447
448    pub fn with_tvl_gt(mut self, tvl_gt: f64) -> Self {
449        self.tvl_gt = Some(tvl_gt);
450        self
451    }
452
453    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
454        self.chunk_size = Some(chunk_size);
455        self
456    }
457}
458
459/// Parameters for [`RPCClient::get_traced_entry_points_paginated`].
460#[derive(Clone, PartialEq, Debug)]
461pub struct TracedEntryPointsPaginatedParams {
462    chain: Chain,
463    protocol_system: String,
464    component_ids: Vec<String>,
465    chunk_size: Option<usize>,
466    concurrency: usize,
467}
468
469impl TracedEntryPointsPaginatedParams {
470    pub fn new(
471        chain: Chain,
472        protocol_system: impl Into<String>,
473        component_ids: Vec<String>,
474        concurrency: usize,
475    ) -> Self {
476        Self {
477            chain,
478            protocol_system: protocol_system.into(),
479            component_ids,
480            chunk_size: None,
481            concurrency,
482        }
483    }
484
485    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
486        self.chunk_size = Some(chunk_size);
487        self
488    }
489}
490
491/// Parameters for [`RPCClient::get_protocol_states_paginated`].
492#[derive(Clone, PartialEq, Debug)]
493pub struct ProtocolStatesPaginatedParams {
494    chain: Chain,
495    protocol_system: String,
496    protocol_ids: Vec<String>,
497    include_balances: bool,
498    version: VersionParam,
499    chunk_size: Option<usize>,
500    concurrency: usize,
501}
502
503impl ProtocolStatesPaginatedParams {
504    pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
505        Self {
506            chain,
507            protocol_system: protocol_system.into(),
508            protocol_ids: Vec::new(),
509            include_balances: true,
510            version: VersionParam::default(),
511            chunk_size: None,
512            concurrency,
513        }
514    }
515
516    pub fn with_protocol_ids(mut self, ids: Vec<String>) -> Self {
517        self.protocol_ids = ids;
518        self
519    }
520
521    pub fn with_include_balances(mut self, include_balances: bool) -> Self {
522        self.include_balances = include_balances;
523        self
524    }
525
526    pub fn with_version(mut self, version: VersionParam) -> Self {
527        self.version = version;
528        self
529    }
530
531    pub fn with_block_number(mut self, block_number: u64) -> Self {
532        self.version = VersionParam::at_block(self.chain.into(), block_number);
533        self
534    }
535
536    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
537        self.chunk_size = Some(chunk_size);
538        self
539    }
540}
541
542/// Parameters for [`RPCClient::get_all_tokens`].
543#[derive(Clone, PartialEq, Debug)]
544pub struct AllTokensParams {
545    chain: Chain,
546    min_quality: Option<i32>,
547    traded_n_days_ago: Option<u64>,
548    chunk_size: Option<usize>,
549    concurrency: usize,
550}
551
552impl AllTokensParams {
553    pub fn new(chain: Chain, concurrency: usize) -> Self {
554        Self { chain, min_quality: None, traded_n_days_ago: None, chunk_size: None, concurrency }
555    }
556
557    pub fn with_min_quality(mut self, min_quality: i32) -> Self {
558        self.min_quality = Some(min_quality);
559        self
560    }
561
562    pub fn with_traded_n_days_ago(mut self, days: u64) -> Self {
563        self.traded_n_days_ago = Some(days);
564        self
565    }
566
567    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
568        self.chunk_size = Some(chunk_size);
569        self
570    }
571}
572
573/// Parameters for [`RPCClient::get_component_tvl_paginated`].
574#[derive(Clone, PartialEq, Debug)]
575pub struct ComponentTvlPaginatedParams {
576    chain: Chain,
577    protocol_system: Option<String>,
578    component_ids: Option<Vec<String>>,
579    chunk_size: Option<usize>,
580    concurrency: usize,
581}
582
583impl ComponentTvlPaginatedParams {
584    pub fn new(chain: Chain, concurrency: usize) -> Self {
585        Self { chain, protocol_system: None, component_ids: None, chunk_size: None, concurrency }
586    }
587
588    pub fn with_protocol_system(mut self, protocol_system: impl Into<String>) -> Self {
589        self.protocol_system = Some(protocol_system.into());
590        self
591    }
592
593    pub fn with_component_ids(mut self, ids: Vec<String>) -> Self {
594        self.component_ids = Some(ids);
595        self
596    }
597
598    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
599        self.chunk_size = Some(chunk_size);
600        self
601    }
602}
603
604/// Parameters for [`RPCClient::get_contract_state_paginated`].
605#[derive(Clone, PartialEq, Debug)]
606pub struct ContractStatePaginatedParams {
607    chain: Chain,
608    protocol_system: String,
609    contract_ids: Vec<Bytes>,
610    version: VersionParam,
611    chunk_size: Option<usize>,
612    concurrency: usize,
613}
614
615impl ContractStatePaginatedParams {
616    pub fn new(chain: Chain, protocol_system: impl Into<String>, concurrency: usize) -> Self {
617        Self {
618            chain,
619            protocol_system: protocol_system.into(),
620            contract_ids: Vec::new(),
621            version: VersionParam::default(),
622            chunk_size: None,
623            concurrency,
624        }
625    }
626
627    pub fn with_contract_ids(mut self, ids: Vec<Bytes>) -> Self {
628        self.contract_ids = ids;
629        self
630    }
631
632    pub fn with_version(mut self, version: VersionParam) -> Self {
633        self.version = version;
634        self
635    }
636
637    pub fn with_block_number(mut self, block_number: u64) -> Self {
638        self.version = VersionParam::at_block(self.chain.into(), block_number);
639        self
640    }
641
642    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
643        self.chunk_size = Some(chunk_size);
644        self
645    }
646}
647
648/// Request body for fetching a snapshot of protocol states and VM storage.
649///
650/// This struct helps to coordinate fetching  multiple pieces of related data
651/// (protocol states, contract storage, TVL, entry points).
652#[derive(Clone, Debug, PartialEq)]
653pub struct SnapshotParameters<'a> {
654    /// Which chain to fetch snapshots for
655    pub chain: Chain,
656    /// Protocol system name, required for correct state resolution
657    pub protocol_system: &'a str,
658    /// Components to fetch protocol states for
659    pub components: &'a HashMap<ComponentId, ProtocolComponent>,
660    /// Traced entry points data mapped by component id (model types)
661    pub entrypoints: Option<&'a TracedEntryPoints>,
662    /// Contract addresses to fetch VM storage for
663    pub contract_ids: &'a [Bytes],
664    /// Block number for versioning
665    pub block_number: u64,
666    /// Whether to include balance information
667    pub include_balances: bool,
668    /// Whether to fetch TVL data
669    pub include_tvl: bool,
670}
671
672impl<'a> SnapshotParameters<'a> {
673    pub fn new(
674        chain: Chain,
675        protocol_system: &'a str,
676        components: &'a HashMap<ComponentId, ProtocolComponent>,
677        contract_ids: &'a [Bytes],
678        block_number: u64,
679    ) -> Self {
680        Self {
681            chain,
682            protocol_system,
683            components,
684            entrypoints: None,
685            contract_ids,
686            block_number,
687            include_balances: true,
688            include_tvl: true,
689        }
690    }
691
692    /// Set whether to include balance information (default: true)
693    pub fn include_balances(mut self, include_balances: bool) -> Self {
694        self.include_balances = include_balances;
695        self
696    }
697
698    /// Set whether to fetch TVL data (default: true)
699    pub fn include_tvl(mut self, include_tvl: bool) -> Self {
700        self.include_tvl = include_tvl;
701        self
702    }
703
704    pub fn entrypoints(mut self, entrypoints: &'a TracedEntryPoints) -> Self {
705        self.entrypoints = Some(entrypoints);
706        self
707    }
708}
709
710#[derive(Error, Debug)]
711pub enum RPCError {
712    /// The passed tycho url failed to parse.
713    #[error("Failed to parse URL: {0}. Error: {1}")]
714    UrlParsing(String, String),
715
716    /// The request data is not correctly formed.
717    #[error("Failed to format request: {0}")]
718    FormatRequest(String),
719
720    /// Errors forwarded from the HTTP protocol.
721    #[error("Unexpected HTTP client error: {0}")]
722    HttpClient(String, #[source] reqwest::Error),
723
724    /// The response from the server could not be parsed correctly.
725    #[error("Failed to parse response: {0}")]
726    ParseResponse(String),
727
728    /// The requested block is outside the server's retention window.
729    #[error("Snapshot block is stale: {0}")]
730    StaleBlock(String),
731
732    /// The requested extractor does not exist on the server.
733    #[error("Unknown extractor: {0}")]
734    UnknownExtractor(String),
735
736    /// Other fatal errors.
737    #[error("Fatal error: {0}")]
738    Fatal(String),
739
740    #[error("Rate limited until {0:?}")]
741    RateLimited(Option<SystemTime>),
742
743    #[error("Server unreachable: {0}")]
744    ServerUnreachable(String),
745}
746
747impl RPCError {
748    /// Converts an HTTP response body parse failure into the correct `RPCError`.
749    ///
750    /// The tycho server returns plain-text error messages (not JSON) when a requested block falls
751    /// outside its retention window. Detecting these here gives callers a typed signal to retry
752    /// with a more recent block rather than treating it as an unrecoverable parse failure.
753    ///
754    /// NOTE: The string matching below is coupled to the server's error message text. If those
755    /// messages change server-side this silently regresses to `ParseResponse`. Replace with a
756    /// structured error code if the server ever returns typed error responses.
757    fn from_parse_error(err: serde_json::Error, body: &str) -> Self {
758        if body.contains("version is older than") || body.contains("Could not find Block") {
759            RPCError::StaleBlock(body.to_string())
760        } else if body.starts_with("Unknown extractor:") {
761            RPCError::UnknownExtractor(body.to_string())
762        } else {
763            RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
764        }
765    }
766}
767
768#[cfg_attr(test, automock)]
769#[async_trait]
770pub trait RPCClient: Send + Sync {
771    /// Returns whether compression is enabled for requests.
772    fn compression(&self) -> bool;
773
774    /// Retrieves a snapshot of contract state for the given contract addresses.
775    ///
776    /// `block_number` pins the query to a specific block; pass `None` to use the latest state.
777    async fn get_contract_state(
778        &self,
779        params: ContractStateParams,
780    ) -> Result<Page<Vec<Account>>, RPCError>;
781
782    /// Retrieves a snapshot of contract state for a set of contract IDs.
783    ///
784    /// If `chunk_size` is `None`, it defaults to the maximum page size.
785    async fn get_contract_state_paginated(
786        &self,
787        params: ContractStatePaginatedParams,
788    ) -> Result<Vec<Account>, RPCError> {
789        let semaphore = Arc::new(Semaphore::new(params.concurrency));
790
791        // Sort the ids to maximize server-side cache hits
792        let mut sorted_ids = params.contract_ids;
793        sorted_ids.sort();
794
795        let chunk_size = params
796            .chunk_size
797            .unwrap_or(StateRequestBody::effective_max_page_size(self.compression()) as usize);
798
799        let mut tasks = Vec::new();
800        for chunk in sorted_ids.chunks(chunk_size) {
801            let sem = semaphore.clone();
802            let base_params =
803                ContractStateParams::new(params.chain, params.protocol_system.as_str())
804                    .with_contract_ids(chunk.to_vec())
805                    .with_version(params.version.clone())
806                    .with_pagination(0, chunk_size as i64);
807            tasks.push(async move {
808                let _permit = sem
809                    .acquire()
810                    .await
811                    .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
812                self.get_contract_state(base_params)
813                    .await
814            });
815        }
816
817        let pages = try_join_all(tasks).await?;
818
819        let accounts = pages
820            .into_iter()
821            .flat_map(|p| p.into_iter())
822            .collect();
823
824        Ok(accounts)
825    }
826
827    /// Retrieves protocol components matching the given filters.
828    ///
829    /// Pass `component_ids` to filter by specific IDs, or `tvl_gt` to filter by minimum TVL.
830    async fn get_protocol_components(
831        &self,
832        params: ProtocolComponentsParams,
833    ) -> Result<Page<Vec<ProtocolComponent>>, RPCError>;
834
835    /// Retrieves protocol components, fetching all pages automatically.
836    ///
837    /// If `chunk_size` is `None`, it defaults to the maximum page size.
838    async fn get_protocol_components_paginated(
839        &self,
840        params: ProtocolComponentsPaginatedParams,
841    ) -> Result<Vec<ProtocolComponent>, RPCError> {
842        let chain = params.chain;
843        let protocol_system = params.protocol_system;
844        let component_ids = params.component_ids;
845        let tvl_gt = params.tvl_gt;
846        let chunk_size = params.chunk_size;
847        let concurrency = params.concurrency;
848
849        let semaphore = Arc::new(Semaphore::new(concurrency));
850
851        let chunk_size = chunk_size.unwrap_or(
852            ProtocolComponentsRequestBody::effective_max_page_size(self.compression()) as usize,
853        );
854
855        // If a set of component IDs is specified, the maximum return size is already known,
856        // allowing us to pre-compute the number of requests to be made.
857        match component_ids {
858            Some(ids) => {
859                let tasks: Vec<_> =
860                    ids.chunks(chunk_size)
861                        .enumerate()
862                        .map(|(index, chunk)| {
863                            let sem = semaphore.clone();
864                            let mut base =
865                                ProtocolComponentsParams::new(chain, protocol_system.as_str())
866                                    .with_component_ids(chunk.to_vec())
867                                    .with_pagination(index as i64, chunk_size as i64);
868                            if let Some(tvl) = tvl_gt {
869                                base = base.with_tvl_gt(tvl);
870                            }
871                            async move {
872                                let _permit = sem.acquire().await.map_err(|_| {
873                                    RPCError::Fatal("Semaphore dropped".to_string())
874                                })?;
875                                self.get_protocol_components(base).await
876                            }
877                        })
878                        .collect();
879
880                try_join_all(tasks)
881                    .await
882                    .map(|pages| pages.into_iter().flatten().collect())
883            }
884            None => {
885                // If no component ids are specified, we need to make requests based on the total
886                // number of results from the first response.
887                let mut base_params =
888                    ProtocolComponentsParams::new(chain, protocol_system.as_str())
889                        .with_pagination(0, chunk_size as i64);
890                if let Some(tvl) = tvl_gt {
891                    base_params = base_params.with_tvl_gt(tvl);
892                }
893
894                let first_page = self
895                    .get_protocol_components(base_params)
896                    .await?;
897
898                let total_items = first_page.total();
899                let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
900
901                let mut all: Vec<ProtocolComponent> = first_page.into_data();
902
903                let mut page = 1;
904                while page < total_pages {
905                    let requests_in_this_iteration = (total_pages - page).min(concurrency as i64);
906
907                    let tasks: Vec<_> = (0..requests_in_this_iteration)
908                        .map(|iter| {
909                            let sem = semaphore.clone();
910                            let mut p =
911                                ProtocolComponentsParams::new(chain, protocol_system.as_str())
912                                    .with_pagination(page + iter, chunk_size as i64);
913                            if let Some(tvl) = tvl_gt {
914                                p = p.with_tvl_gt(tvl);
915                            }
916                            async move {
917                                let _permit = sem.acquire().await.map_err(|_| {
918                                    RPCError::Fatal("Semaphore dropped".to_string())
919                                })?;
920                                self.get_protocol_components(p).await
921                            }
922                        })
923                        .collect();
924
925                    let responses = try_join_all(tasks).await?;
926
927                    for resp in responses {
928                        all.extend(resp);
929                    }
930
931                    page += requests_in_this_iteration;
932                }
933                Ok(all)
934            }
935        }
936    }
937
938    /// Retrieves a page of protocol component states.
939    ///
940    /// `block_number` pins the query to a specific block; pass `None` to use the latest state.
941    async fn get_protocol_states(
942        &self,
943        params: ProtocolStatesParams,
944    ) -> Result<Page<Vec<ProtocolComponentState>>, RPCError>;
945
946    /// Retrieves protocol states for a set of protocol IDs, fetching all pages automatically.
947    ///
948    /// If `chunk_size` is `None`, it defaults to the maximum page size.
949    async fn get_protocol_states_paginated(
950        &self,
951        params: ProtocolStatesPaginatedParams,
952    ) -> Result<Vec<ProtocolComponentState>, RPCError> {
953        let semaphore = Arc::new(Semaphore::new(params.concurrency));
954
955        let chunk_size =
956            params
957                .chunk_size
958                .unwrap_or(
959                    ProtocolStateRequestBody::effective_max_page_size(self.compression()) as usize
960                );
961
962        let tasks: Vec<_> = params
963            .protocol_ids
964            .chunks(chunk_size)
965            .map(|c| {
966                let sem = semaphore.clone();
967                let p = ProtocolStatesParams::new(params.chain, params.protocol_system.as_str())
968                    .with_protocol_ids(c.to_vec())
969                    .with_include_balances(params.include_balances)
970                    .with_version(params.version.clone())
971                    .with_pagination(0, chunk_size as i64);
972                async move {
973                    let _permit = sem
974                        .acquire()
975                        .await
976                        .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
977                    self.get_protocol_states(p).await
978                }
979            })
980            .collect();
981
982        try_join_all(tasks)
983            .await
984            .map(|pages| pages.into_iter().flatten().collect())
985    }
986
987    /// Retrieves a page of tokens.
988    ///
989    /// Use `get_all_tokens` to fetch all matching tokens automatically.
990    async fn get_tokens(&self, params: TokensParams) -> Result<Page<Vec<Token>>, RPCError>;
991
992    /// Retrieves all tokens matching the given criteria, fetching all pages automatically.
993    ///
994    /// If `chunk_size` is `None`, it defaults to the maximum page size.
995    async fn get_all_tokens(&self, params: AllTokensParams) -> Result<Vec<Token>, RPCError> {
996        let chunk_size = params
997            .chunk_size
998            .unwrap_or(TokensRequestBody::effective_max_page_size(self.compression()) as usize);
999
1000        let semaphore = Arc::new(Semaphore::new(params.concurrency));
1001
1002        let page_size: i64 = chunk_size.try_into().map_err(|_| {
1003            RPCError::FormatRequest("Failed to convert chunk_size into i64".to_string())
1004        })?;
1005
1006        let mut base_params = TokensParams::new(params.chain).with_pagination(0, page_size);
1007        if let Some(q) = params.min_quality {
1008            base_params = base_params.with_min_quality(q);
1009        }
1010        if let Some(d) = params.traded_n_days_ago {
1011            base_params = base_params.with_traded_n_days_ago(d);
1012        }
1013
1014        let first_page = self.get_tokens(base_params).await?;
1015        let total_pages = (first_page.total() as f64 / chunk_size as f64).ceil() as i64;
1016
1017        let mut all_tokens: Vec<Token> = first_page.into_data();
1018
1019        if total_pages <= 1 {
1020            return Ok(all_tokens);
1021        }
1022
1023        let tasks: Vec<_> = (1..total_pages)
1024            .map(|page| {
1025                let sem = semaphore.clone();
1026                let mut p = TokensParams::new(params.chain).with_pagination(page, page_size);
1027                if let Some(q) = params.min_quality {
1028                    p = p.with_min_quality(q);
1029                }
1030                if let Some(d) = params.traded_n_days_ago {
1031                    p = p.with_traded_n_days_ago(d);
1032                }
1033                async move {
1034                    let _permit = sem
1035                        .acquire()
1036                        .await
1037                        .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
1038                    self.get_tokens(p).await
1039                }
1040            })
1041            .collect();
1042
1043        let pages = try_join_all(tasks).await?;
1044        for page in pages {
1045            all_tokens.extend(page);
1046        }
1047
1048        Ok(all_tokens)
1049    }
1050
1051    /// Retrieves the protocol systems known to the server.
1052    async fn get_protocol_systems(
1053        &self,
1054        params: ProtocolSystemsParams,
1055    ) -> Result<Page<ProtocolSystems>, RPCError>;
1056
1057    /// Retrieves component TVL values.
1058    ///
1059    /// Filter by `component_ids` or by `protocol_system`; both are optional.
1060    async fn get_component_tvl(
1061        &self,
1062        params: ComponentTvlParams,
1063    ) -> Result<Page<HashMap<String, f64>>, RPCError>;
1064
1065    /// Retrieves component TVL values, fetching all pages automatically.
1066    ///
1067    /// If `chunk_size` is `None`, it defaults to the maximum page size.
1068    async fn get_component_tvl_paginated(
1069        &self,
1070        params: ComponentTvlPaginatedParams,
1071    ) -> Result<HashMap<String, f64>, RPCError> {
1072        let semaphore = Arc::new(Semaphore::new(params.concurrency));
1073
1074        let chunk_size =
1075            params
1076                .chunk_size
1077                .unwrap_or(
1078                    ComponentTvlRequestBody::effective_max_page_size(self.compression()) as usize
1079                );
1080
1081        match params.component_ids {
1082            Some(ids) => {
1083                let tasks: Vec<_> =
1084                    ids.chunks(chunk_size)
1085                        .enumerate()
1086                        .map(|(index, chunk)| {
1087                            let sem = semaphore.clone();
1088                            let mut p = ComponentTvlParams::new(params.chain)
1089                                .with_component_ids(chunk.to_vec())
1090                                .with_pagination(index as i64, chunk_size as i64);
1091                            if let Some(ref ps) = params.protocol_system {
1092                                p = p.with_protocol_system(ps.as_str());
1093                            }
1094                            async move {
1095                                let _permit = sem.acquire().await.map_err(|_| {
1096                                    RPCError::Fatal("Semaphore dropped".to_string())
1097                                })?;
1098                                self.get_component_tvl(p).await
1099                            }
1100                        })
1101                        .collect();
1102
1103                let pages = try_join_all(tasks).await?;
1104
1105                let mut merged_tvl = HashMap::new();
1106                for page in pages {
1107                    for (key, value) in page {
1108                        *merged_tvl.entry(key).or_insert(0.0) = value;
1109                    }
1110                }
1111
1112                Ok(merged_tvl)
1113            }
1114            None => {
1115                let mut base =
1116                    ComponentTvlParams::new(params.chain).with_pagination(0, chunk_size as i64);
1117                if let Some(ref ps) = params.protocol_system {
1118                    base = base.with_protocol_system(ps.as_str());
1119                }
1120
1121                let first_page = self.get_component_tvl(base).await?;
1122                let total_items = first_page.total();
1123                let total_pages = (total_items as f64 / chunk_size as f64).ceil() as i64;
1124
1125                let mut merged_tvl: HashMap<String, f64> = first_page.into_data();
1126
1127                let mut page = 1;
1128                while page < total_pages {
1129                    let requests_in_this_iteration =
1130                        (total_pages - page).min(params.concurrency as i64);
1131
1132                    let tasks: Vec<_> = (0..requests_in_this_iteration)
1133                        .map(|i| {
1134                            let sem = semaphore.clone();
1135                            let mut p = ComponentTvlParams::new(params.chain)
1136                                .with_pagination(page + i, chunk_size as i64);
1137                            if let Some(ref ps) = params.protocol_system {
1138                                p = p.with_protocol_system(ps.as_str());
1139                            }
1140                            async move {
1141                                let _permit = sem.acquire().await.map_err(|_| {
1142                                    RPCError::Fatal("Semaphore dropped".to_string())
1143                                })?;
1144                                self.get_component_tvl(p).await
1145                            }
1146                        })
1147                        .collect();
1148
1149                    let responses = try_join_all(tasks).await?;
1150
1151                    for resp in responses {
1152                        for (key, value) in resp {
1153                            *merged_tvl.entry(key).or_insert(0.0) = value;
1154                        }
1155                    }
1156
1157                    page += requests_in_this_iteration;
1158                }
1159
1160                Ok(merged_tvl)
1161            }
1162        }
1163    }
1164
1165    /// Retrieves a page of traced entry points.
1166    ///
1167    /// Use `get_traced_entry_points_paginated` to fetch all pages automatically.
1168    async fn get_traced_entry_points(
1169        &self,
1170        params: TracedEntryPointsParams,
1171    ) -> Result<Page<TracedEntryPoints>, RPCError>;
1172
1173    /// Retrieves traced entry points for a set of component IDs, fetching all pages automatically.
1174    ///
1175    /// If `chunk_size` is `None`, it defaults to the maximum page size.
1176    async fn get_traced_entry_points_paginated(
1177        &self,
1178        params: TracedEntryPointsPaginatedParams,
1179    ) -> Result<TracedEntryPoints, RPCError> {
1180        let chain = params.chain;
1181        let protocol_system = params.protocol_system;
1182        let component_ids = params.component_ids;
1183        let chunk_size = params.chunk_size;
1184        let concurrency = params.concurrency;
1185
1186        let semaphore = Arc::new(Semaphore::new(concurrency));
1187
1188        let chunk_size = chunk_size.unwrap_or(
1189            TracedEntryPointRequestBody::effective_max_page_size(self.compression()) as usize,
1190        );
1191
1192        let tasks: Vec<_> = component_ids
1193            .chunks(chunk_size)
1194            .map(|c| {
1195                let sem = semaphore.clone();
1196                let params = TracedEntryPointsParams::new(chain, protocol_system.as_str())
1197                    .with_component_ids(c.to_vec())
1198                    .with_pagination(0, chunk_size as i64);
1199                async move {
1200                    let _permit = sem
1201                        .acquire()
1202                        .await
1203                        .map_err(|_| RPCError::Fatal("Semaphore dropped".to_string()))?;
1204                    self.get_traced_entry_points(params)
1205                        .await
1206                }
1207            })
1208            .collect();
1209
1210        try_join_all(tasks)
1211            .await
1212            .map(|pages| pages.into_iter().flatten().collect())
1213    }
1214
1215    #[allow(clippy::extra_unused_lifetimes)]
1216    async fn get_snapshots<'a>(
1217        &self,
1218        request: &SnapshotParameters<'a>,
1219        chunk_size: Option<usize>,
1220        concurrency: usize,
1221    ) -> Result<Snapshot, RPCError>;
1222}
1223
1224/// Configuration options for HttpRPCClient
1225#[derive(Debug, Clone)]
1226pub struct HttpRPCClientOptions {
1227    /// Optional API key for authentication
1228    pub auth_key: Option<String>,
1229    /// Enable compression for requests (default: true)
1230    /// When enabled, adds Accept-Encoding: zstd header
1231    pub compression: bool,
1232}
1233
1234impl Default for HttpRPCClientOptions {
1235    fn default() -> Self {
1236        Self::new()
1237    }
1238}
1239
1240impl HttpRPCClientOptions {
1241    /// Create new options with default values (compression enabled)
1242    pub fn new() -> Self {
1243        Self { auth_key: None, compression: true }
1244    }
1245
1246    /// Set the authentication key
1247    pub fn with_auth_key(mut self, auth_key: Option<String>) -> Self {
1248        self.auth_key = auth_key;
1249        self
1250    }
1251
1252    /// Set whether to enable compression (default: true)
1253    pub fn with_compression(mut self, compression: bool) -> Self {
1254        self.compression = compression;
1255        self
1256    }
1257}
1258
1259#[derive(Debug, Clone)]
1260pub struct HttpRPCClient {
1261    http_client: Client,
1262    url: Url,
1263    retry_after: Arc<RwLock<Option<SystemTime>>>,
1264    backoff_policy: ExponentialBackoff,
1265    server_restart_duration: Duration,
1266    compression: bool,
1267}
1268
1269impl HttpRPCClient {
1270    pub fn new(base_uri: &str, options: HttpRPCClientOptions) -> Result<Self, RPCError> {
1271        let uri = base_uri
1272            .parse::<Url>()
1273            .map_err(|e| RPCError::UrlParsing(base_uri.to_string(), e.to_string()))?;
1274
1275        // Add default headers
1276        let mut headers = header::HeaderMap::new();
1277        headers.insert(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json"));
1278        let user_agent = format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION"));
1279        headers.insert(
1280            header::USER_AGENT,
1281            header::HeaderValue::from_str(&user_agent)
1282                .map_err(|e| RPCError::FormatRequest(format!("Invalid user agent format: {e}")))?,
1283        );
1284
1285        // Add Authorization if one is given
1286        if let Some(key) = options.auth_key.as_deref() {
1287            let mut auth_value = header::HeaderValue::from_str(key).map_err(|e| {
1288                RPCError::FormatRequest(format!("Invalid authorization key format: {e}"))
1289            })?;
1290            auth_value.set_sensitive(true);
1291            headers.insert(header::AUTHORIZATION, auth_value);
1292        }
1293
1294        let mut client_builder = ClientBuilder::new()
1295            .default_headers(headers)
1296            .http2_prior_knowledge();
1297
1298        // When compression is disabled, turn off all automatic compression
1299        if !options.compression {
1300            client_builder = client_builder.no_zstd();
1301        }
1302
1303        let client = client_builder
1304            .build()
1305            .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
1306
1307        Ok(Self {
1308            http_client: client,
1309            url: uri,
1310            retry_after: Arc::new(RwLock::new(None)),
1311            backoff_policy: ExponentialBackoffBuilder::new()
1312                .with_initial_interval(Duration::from_millis(250))
1313                // increase backoff time by 75% each failure
1314                .with_multiplier(1.75)
1315                // keep retrying every 30s
1316                .with_max_interval(Duration::from_secs(30))
1317                // if all retries take longer than 2m, give up
1318                .with_max_elapsed_time(Some(Duration::from_secs(125)))
1319                .build(),
1320            server_restart_duration: Duration::from_secs(120),
1321            compression: options.compression,
1322        })
1323    }
1324
1325    #[cfg(test)]
1326    pub fn with_test_backoff_policy(mut self) -> Self {
1327        // Extremely short intervals for very fast testing
1328        self.backoff_policy = ExponentialBackoffBuilder::new()
1329            .with_initial_interval(Duration::from_millis(1))
1330            .with_multiplier(1.1)
1331            .with_max_interval(Duration::from_millis(5))
1332            .with_max_elapsed_time(Some(Duration::from_millis(50)))
1333            .build();
1334        self.server_restart_duration = Duration::from_millis(50);
1335        self
1336    }
1337
1338    /// Converts a error response to a Result.
1339    ///
1340    /// Raises an error if the response status code id 429, 502, 503 or 504. In the 429
1341    /// case it will try to look for a retry-after header an parse it accordingly. The
1342    /// parsed value is then passed as part of the error.
1343    async fn error_for_response(
1344        &self,
1345        response: reqwest::Response,
1346    ) -> Result<reqwest::Response, RPCError> {
1347        match response.status() {
1348            StatusCode::TOO_MANY_REQUESTS => {
1349                let retry_after_raw = response
1350                    .headers()
1351                    .get(reqwest::header::RETRY_AFTER)
1352                    .and_then(|h| h.to_str().ok())
1353                    .and_then(parse_retry_value);
1354
1355                let reason = response
1356                    .text()
1357                    .await
1358                    .unwrap_or_default();
1359                warn!(reason, retry_after = ?retry_after_raw, "Rate limited by server");
1360
1361                Err(RPCError::RateLimited(retry_after_raw))
1362            }
1363            StatusCode::BAD_GATEWAY |
1364            StatusCode::SERVICE_UNAVAILABLE |
1365            StatusCode::GATEWAY_TIMEOUT => Err(RPCError::ServerUnreachable(
1366                response
1367                    .text()
1368                    .await
1369                    .unwrap_or_else(|_| "Server Unreachable".to_string()),
1370            )),
1371            _ => Ok(response),
1372        }
1373    }
1374
1375    /// Classifies errors into transient or permanent ones.
1376    ///
1377    /// Transient errors are retried with a potential backoff, permanent ones are not.
1378    /// If the error is RateLimited, this method will set the self.retry_after value so
1379    /// future requests wait until the rate limit has been reset.
1380    async fn handle_error_for_backoff(&self, e: RPCError) -> backoff::Error<RPCError> {
1381        match e {
1382            RPCError::ServerUnreachable(_) => {
1383                backoff::Error::retry_after(e, self.server_restart_duration)
1384            }
1385            RPCError::RateLimited(Some(until)) => {
1386                let mut retry_after_guard = self.retry_after.write().await;
1387                *retry_after_guard = Some(
1388                    retry_after_guard
1389                        .unwrap_or(until)
1390                        .max(until),
1391                );
1392
1393                if let Ok(duration) = until.duration_since(SystemTime::now()) {
1394                    backoff::Error::retry_after(e, duration)
1395                } else {
1396                    e.into()
1397                }
1398            }
1399            RPCError::RateLimited(None) => e.into(),
1400            _ => backoff::Error::permanent(e),
1401        }
1402    }
1403
1404    /// Waits until the current rate limit time has passed.
1405    ///
1406    /// Only waits if there is a time and that time is in the future, else return
1407    /// immediately.
1408    async fn wait_until_retry_after(&self) {
1409        if let Some(&until) = self.retry_after.read().await.as_ref() {
1410            let now = SystemTime::now();
1411            if until > now {
1412                if let Ok(duration) = until.duration_since(now) {
1413                    sleep(duration).await
1414                }
1415            }
1416        }
1417    }
1418
1419    /// Makes a post request handling transient failures.
1420    ///
1421    /// If a retry-after header is received it will be respected. Else the configured
1422    /// backoff policy is used to deal with transient network or server errors.
1423    async fn make_post_request<T: Serialize + ?Sized>(
1424        &self,
1425        request: &T,
1426        uri: &String,
1427    ) -> Result<Response, RPCError> {
1428        self.wait_until_retry_after().await;
1429        let response = backoff::future::retry(self.backoff_policy.clone(), || async {
1430            let server_response = self
1431                .http_client
1432                .post(uri)
1433                .json(request)
1434                .send()
1435                .await
1436                .map_err(|e| RPCError::HttpClient(e.to_string(), e))?;
1437
1438            match self
1439                .error_for_response(server_response)
1440                .await
1441            {
1442                Ok(response) => Ok(response),
1443                Err(e) => Err(self.handle_error_for_backoff(e).await),
1444            }
1445        })
1446        .await?;
1447        Ok(response)
1448    }
1449}
1450
1451fn parse_retry_value(val: &str) -> Option<SystemTime> {
1452    if let Ok(secs) = val.parse::<u64>() {
1453        return Some(SystemTime::now() + Duration::from_secs(secs));
1454    }
1455    if let Ok(date) = OffsetDateTime::parse(val, &Rfc2822) {
1456        return Some(date.into());
1457    }
1458    None
1459}
1460
1461#[async_trait]
1462impl RPCClient for HttpRPCClient {
1463    fn compression(&self) -> bool {
1464        self.compression
1465    }
1466
1467    #[instrument(skip(self))]
1468    async fn get_contract_state(
1469        &self,
1470        params: ContractStateParams,
1471    ) -> Result<Page<Vec<Account>>, RPCError> {
1472        if params
1473            .contract_ids
1474            .as_ref()
1475            .is_none_or(|ids| ids.is_empty())
1476        {
1477            warn!("No contract ids specified in request.");
1478        }
1479
1480        let request = StateRequestBody {
1481            contract_ids: params.contract_ids,
1482            protocol_system: params.protocol_system,
1483            chain: params.chain.into(),
1484            version: params.version,
1485            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1486        };
1487
1488        let uri = format!(
1489            "{}/{}/contract_state",
1490            self.url
1491                .to_string()
1492                .trim_end_matches('/'),
1493            TYCHO_SERVER_VERSION
1494        );
1495        debug!(%uri, "Sending contract_state request to Tycho server");
1496        trace!(?request, "Sending request to Tycho server");
1497        let response = self
1498            .make_post_request(&request, &uri)
1499            .await?;
1500        trace!(?response, "Received response from Tycho server");
1501
1502        let body = response
1503            .text()
1504            .await
1505            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1506        if body.is_empty() {
1507            // Pure native protocols will return empty contract states
1508            return Ok(Page::new(vec![], 0, 0, 0));
1509        }
1510
1511        let dto_response = serde_json::from_str::<StateRequestResponse>(&body)
1512            .map_err(|err| RPCError::from_parse_error(err, &body))?;
1513        trace!(?dto_response, "Received contract_state response from Tycho server");
1514
1515        let data: Vec<Account> = dto_response
1516            .accounts
1517            .into_iter()
1518            .map(Account::from)
1519            .collect();
1520        Ok(Page::new(
1521            data,
1522            dto_response.pagination.total,
1523            dto_response.pagination.page,
1524            dto_response.pagination.page_size,
1525        ))
1526    }
1527
1528    async fn get_protocol_components(
1529        &self,
1530        params: ProtocolComponentsParams,
1531    ) -> Result<Page<Vec<ProtocolComponent>>, RPCError> {
1532        let request = ProtocolComponentsRequestBody {
1533            protocol_system: params.protocol_system,
1534            component_ids: params.component_ids,
1535            tvl_gt: params.tvl_gt,
1536            chain: params.chain.into(),
1537            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1538        };
1539
1540        let uri = format!(
1541            "{}/{}/protocol_components",
1542            self.url
1543                .to_string()
1544                .trim_end_matches('/'),
1545            TYCHO_SERVER_VERSION,
1546        );
1547        debug!(%uri, "Sending protocol_components request to Tycho server");
1548        trace!(?request, "Sending request to Tycho server");
1549
1550        let response = self
1551            .make_post_request(&request, &uri)
1552            .await?;
1553
1554        trace!(?response, "Received response from Tycho server");
1555
1556        let body = response
1557            .text()
1558            .await
1559            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1560        let dto_response = serde_json::from_str::<ProtocolComponentRequestResponse>(&body)
1561            .map_err(|err| RPCError::from_parse_error(err, &body))?;
1562        trace!(?dto_response, "Received protocol_components response from Tycho server");
1563
1564        let data: Vec<ProtocolComponent> = dto_response
1565            .protocol_components
1566            .into_iter()
1567            .map(ProtocolComponent::from)
1568            .collect();
1569        Ok(Page::new(
1570            data,
1571            dto_response.pagination.total,
1572            dto_response.pagination.page,
1573            dto_response.pagination.page_size,
1574        ))
1575    }
1576
1577    async fn get_protocol_states(
1578        &self,
1579        params: ProtocolStatesParams,
1580    ) -> Result<Page<Vec<ProtocolComponentState>>, RPCError> {
1581        if params
1582            .protocol_ids
1583            .as_ref()
1584            .is_none_or(|ids| ids.is_empty())
1585        {
1586            warn!("No protocol ids specified in request.");
1587        }
1588
1589        let request = ProtocolStateRequestBody {
1590            protocol_ids: params.protocol_ids,
1591            protocol_system: params.protocol_system,
1592            chain: params.chain.into(),
1593            include_balances: params.include_balances,
1594            version: params.version,
1595            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1596        };
1597
1598        let uri = format!(
1599            "{}/{}/protocol_state",
1600            self.url
1601                .to_string()
1602                .trim_end_matches('/'),
1603            TYCHO_SERVER_VERSION
1604        );
1605        debug!(%uri, "Sending protocol_states request to Tycho server");
1606        trace!(?request, "Sending request to Tycho server");
1607
1608        let response = self
1609            .make_post_request(&request, &uri)
1610            .await?;
1611        trace!(?response, "Received response from Tycho server");
1612
1613        let body = response
1614            .text()
1615            .await
1616            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1617
1618        if body.is_empty() {
1619            // Pure VM protocols will return empty states
1620            return Ok(Page::new(vec![], 0, 0, 0));
1621        }
1622
1623        let dto_response = serde_json::from_str::<ProtocolStateRequestResponse>(&body)
1624            .map_err(|err| RPCError::from_parse_error(err, &body))?;
1625        trace!(?dto_response, "Received protocol_states response from Tycho server");
1626
1627        let data: Vec<ProtocolComponentState> = dto_response
1628            .states
1629            .into_iter()
1630            .map(ProtocolComponentState::from)
1631            .collect();
1632        Ok(Page::new(
1633            data,
1634            dto_response.pagination.total,
1635            dto_response.pagination.page,
1636            dto_response.pagination.page_size,
1637        ))
1638    }
1639
1640    async fn get_tokens(&self, params: TokensParams) -> Result<Page<Vec<Token>>, RPCError> {
1641        let request = TokensRequestBody {
1642            token_addresses: None,
1643            min_quality: params.min_quality,
1644            traded_n_days_ago: params.traded_n_days_ago,
1645            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1646            chain: params.chain.into(),
1647        };
1648
1649        let uri = format!(
1650            "{}/{}/tokens",
1651            self.url
1652                .to_string()
1653                .trim_end_matches('/'),
1654            TYCHO_SERVER_VERSION
1655        );
1656        debug!(%uri, "Sending tokens request to Tycho server");
1657
1658        let response = self
1659            .make_post_request(&request, &uri)
1660            .await?;
1661
1662        let body = response
1663            .text()
1664            .await
1665            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1666        let dto_response = serde_json::from_str::<TokensRequestResponse>(&body)
1667            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1668
1669        let data: Vec<Token> = dto_response
1670            .tokens
1671            .into_iter()
1672            .map(Token::from)
1673            .collect();
1674        Ok(Page::new(
1675            data,
1676            dto_response.pagination.total,
1677            dto_response.pagination.page,
1678            dto_response.pagination.page_size,
1679        ))
1680    }
1681
1682    async fn get_protocol_systems(
1683        &self,
1684        params: ProtocolSystemsParams,
1685    ) -> Result<Page<ProtocolSystems>, RPCError> {
1686        let request = ProtocolSystemsRequestBody {
1687            chain: params.chain.into(),
1688            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1689        };
1690
1691        let uri = format!(
1692            "{}/{}/protocol_systems",
1693            self.url
1694                .to_string()
1695                .trim_end_matches('/'),
1696            TYCHO_SERVER_VERSION
1697        );
1698        debug!(%uri, "Sending protocol_systems request to Tycho server");
1699        trace!(?request, "Sending request to Tycho server");
1700        let response = self
1701            .make_post_request(&request, &uri)
1702            .await?;
1703        trace!(?response, "Received response from Tycho server");
1704        let body = response
1705            .text()
1706            .await
1707            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1708        let dto = serde_json::from_str::<ProtocolSystemsRequestResponse>(&body)
1709            .map_err(|err| RPCError::ParseResponse(format!("Error: {err}, Body: {body}")))?;
1710        trace!(?dto, "Received protocol_systems response from Tycho server");
1711        Ok(Page::new(
1712            ProtocolSystems::new(dto.protocol_systems, dto.dci_protocols),
1713            dto.pagination.total,
1714            dto.pagination.page,
1715            dto.pagination.page_size,
1716        ))
1717    }
1718
1719    async fn get_component_tvl(
1720        &self,
1721        params: ComponentTvlParams,
1722    ) -> Result<Page<HashMap<String, f64>>, RPCError> {
1723        let request = ComponentTvlRequestBody {
1724            chain: params.chain.into(),
1725            protocol_system: params.protocol_system,
1726            component_ids: params.component_ids,
1727            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1728        };
1729
1730        let uri = format!(
1731            "{}/{}/component_tvl",
1732            self.url
1733                .to_string()
1734                .trim_end_matches('/'),
1735            TYCHO_SERVER_VERSION
1736        );
1737        debug!(%uri, "Sending get_component_tvl request to Tycho server");
1738        trace!(?request, "Sending request to Tycho server");
1739        let response = self
1740            .make_post_request(&request, &uri)
1741            .await?;
1742        trace!(?response, "Received response from Tycho server");
1743        let body = response
1744            .text()
1745            .await
1746            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1747        let dto_response =
1748            serde_json::from_str::<ComponentTvlRequestResponse>(&body).map_err(|err| {
1749                error!("Failed to parse component_tvl response: {:?}", &body);
1750                RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1751            })?;
1752        trace!(?dto_response, "Received component_tvl response from Tycho server");
1753        Ok(Page::new(
1754            dto_response.tvl,
1755            dto_response.pagination.total,
1756            dto_response.pagination.page,
1757            dto_response.pagination.page_size,
1758        ))
1759    }
1760
1761    async fn get_traced_entry_points(
1762        &self,
1763        params: TracedEntryPointsParams,
1764    ) -> Result<Page<TracedEntryPoints>, RPCError> {
1765        let request = TracedEntryPointRequestBody {
1766            chain: params.chain.into(),
1767            protocol_system: params.protocol_system,
1768            component_ids: params.component_ids,
1769            pagination: PaginationParams { page: params.page, page_size: params.page_size },
1770        };
1771
1772        let uri = format!(
1773            "{}/{TYCHO_SERVER_VERSION}/traced_entry_points",
1774            self.url
1775                .to_string()
1776                .trim_end_matches('/')
1777        );
1778        debug!(%uri, "Sending traced_entry_points request to Tycho server");
1779        trace!(?request, "Sending request to Tycho server");
1780
1781        let response = self
1782            .make_post_request(&request, &uri)
1783            .await?;
1784
1785        trace!(?response, "Received response from Tycho server");
1786
1787        let body = response
1788            .text()
1789            .await
1790            .map_err(|e| RPCError::ParseResponse(e.to_string()))?;
1791        let dto_response =
1792            serde_json::from_str::<TracedEntryPointRequestResponse>(&body).map_err(|err| {
1793                error!("Failed to parse traced_entry_points response: {:?}", &body);
1794                RPCError::ParseResponse(format!("Error: {err}, Body: {body}"))
1795            })?;
1796        trace!(?dto_response, "Received traced_entry_points response from Tycho server");
1797        let data: TracedEntryPoints = dto_response
1798            .traced_entry_points
1799            .into_iter()
1800            .map(|(k, v)| {
1801                (
1802                    k,
1803                    v.into_iter()
1804                        .map(|(ep, tr)| {
1805                            (EntryPointWithTracingParams::from(ep), TracingResult::from(tr))
1806                        })
1807                        .collect(),
1808                )
1809            })
1810            .collect();
1811        Ok(Page::new(
1812            data,
1813            dto_response.pagination.total,
1814            dto_response.pagination.page,
1815            dto_response.pagination.page_size,
1816        ))
1817    }
1818
1819    #[allow(clippy::extra_unused_lifetimes)]
1820    async fn get_snapshots<'a>(
1821        &self,
1822        request: &SnapshotParameters<'a>,
1823        chunk_size: Option<usize>,
1824        concurrency: usize,
1825    ) -> Result<Snapshot, RPCError> {
1826        let component_ids: Vec<_> = request
1827            .components
1828            .keys()
1829            .cloned()
1830            .collect();
1831
1832        let component_tvl = if request.include_tvl && !component_ids.is_empty() {
1833            self.get_component_tvl_paginated(
1834                ComponentTvlPaginatedParams::new(request.chain, concurrency)
1835                    .with_component_ids(component_ids.clone()),
1836            )
1837            .await?
1838        } else {
1839            HashMap::new()
1840        };
1841
1842        let version = VersionParam::at_block(request.chain.into(), request.block_number);
1843
1844        let mut protocol_states = if !component_ids.is_empty() {
1845            self.get_protocol_states_paginated(
1846                ProtocolStatesPaginatedParams::new(
1847                    request.chain,
1848                    request.protocol_system,
1849                    concurrency,
1850                )
1851                .with_protocol_ids(component_ids.clone())
1852                .with_include_balances(request.include_balances)
1853                .with_version(version.clone()),
1854            )
1855            .await?
1856            .into_iter()
1857            .map(|state| (state.component_id.clone(), state))
1858            .collect()
1859        } else {
1860            HashMap::new()
1861        };
1862
1863        // Convert to ComponentWithState, which includes entrypoint information.
1864        let states = request
1865            .components
1866            .values()
1867            .filter_map(|component| {
1868                if let Some(state) = protocol_states.remove(&component.id) {
1869                    Some((
1870                        component.id.clone(),
1871                        ComponentWithState {
1872                            state,
1873                            component: component.clone(),
1874                            component_tvl: component_tvl
1875                                .get(&component.id)
1876                                .cloned(),
1877                            entrypoints: request
1878                                .entrypoints
1879                                .as_ref()
1880                                .and_then(|map| map.get(&component.id))
1881                                .cloned()
1882                                .unwrap_or_default(),
1883                        },
1884                    ))
1885                } else if component_ids.contains(&component.id) {
1886                    // only emit error event if we requested this component
1887                    let component_id = &component.id;
1888                    error!(?component_id, "Missing state for native component!");
1889                    None
1890                } else {
1891                    None
1892                }
1893            })
1894            .collect();
1895
1896        let vm_storage = if !request.contract_ids.is_empty() {
1897            let mut cp_params = ContractStatePaginatedParams::new(
1898                request.chain,
1899                request.protocol_system,
1900                concurrency,
1901            )
1902            .with_contract_ids(request.contract_ids.to_vec())
1903            .with_version(version.clone());
1904            if let Some(cs) = chunk_size {
1905                cp_params = cp_params.with_chunk_size(cs);
1906            }
1907            let contract_states = self
1908                .get_contract_state_paginated(cp_params)
1909                .await?
1910                .into_iter()
1911                .map(|acc| (acc.address.clone(), acc))
1912                .collect::<HashMap<_, _>>();
1913
1914            trace!(states=?&contract_states, "Retrieved ContractState");
1915
1916            let contract_address_to_components = request
1917                .components
1918                .iter()
1919                .filter_map(|(id, comp)| {
1920                    if component_ids.contains(id) {
1921                        Some(
1922                            comp.contract_addresses
1923                                .iter()
1924                                .map(|address| (address.clone(), comp.id.clone())),
1925                        )
1926                    } else {
1927                        None
1928                    }
1929                })
1930                .flatten()
1931                .fold(HashMap::<Bytes, Vec<String>>::new(), |mut acc, (addr, c_id)| {
1932                    acc.entry(addr).or_default().push(c_id);
1933                    acc
1934                });
1935
1936            request
1937                .contract_ids
1938                .iter()
1939                .filter_map(|address| {
1940                    if let Some(state) = contract_states.get(address) {
1941                        Some((address.clone(), state.clone()))
1942                    } else if let Some(ids) = contract_address_to_components.get(address) {
1943                        // only emit error even if we did actually request this address
1944                        error!(
1945                            ?address,
1946                            ?ids,
1947                            "Component with lacking contract storage encountered!"
1948                        );
1949                        None
1950                    } else {
1951                        None
1952                    }
1953                })
1954                .collect()
1955        } else {
1956            HashMap::new()
1957        };
1958
1959        Ok(Snapshot { states, vm_storage })
1960    }
1961}
1962
1963#[cfg(test)]
1964mod tests {
1965    use std::{
1966        collections::{HashMap, HashSet},
1967        str::FromStr,
1968    };
1969
1970    use mockito::Server;
1971    use rstest::rstest;
1972    use tycho_common::models::blockchain::AddressStorageLocation;
1973
1974    use super::*;
1975
1976    // Dummy implementation of `get_protocol_states_paginated` for backwards compatibility testing
1977    // purposes
1978    impl MockRPCClient {
1979        #[allow(clippy::too_many_arguments)]
1980        async fn test_get_protocol_states_paginated<T>(
1981            &self,
1982            chain: Chain,
1983            ids: &[T],
1984            protocol_system: &str,
1985            include_balances: bool,
1986            block_number: Option<u64>,
1987            chunk_size: usize,
1988            _concurrency: usize,
1989        ) -> Vec<(Chain, Vec<String>, String, bool, Option<u64>, PaginationParams)>
1990        where
1991            T: AsRef<str> + Clone + Send + Sync + 'static,
1992        {
1993            ids.chunks(chunk_size)
1994                .map(|chunk| {
1995                    (
1996                        chain,
1997                        chunk
1998                            .iter()
1999                            .map(|id| id.as_ref().to_string())
2000                            .collect(),
2001                        protocol_system.to_string(),
2002                        include_balances,
2003                        block_number,
2004                        PaginationParams { page: 0, page_size: chunk_size as i64 },
2005                    )
2006                })
2007                .collect()
2008        }
2009    }
2010
2011    const GET_CONTRACT_STATE_RESP: &str = r#"
2012        {
2013            "accounts": [
2014                {
2015                    "chain": "ethereum",
2016                    "address": "0x0000000000000000000000000000000000000000",
2017                    "title": "",
2018                    "slots": {},
2019                    "native_balance": "0x01f4",
2020                    "token_balances": {},
2021                    "code": "0x00",
2022                    "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
2023                    "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2024                    "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2025                    "creation_tx": null
2026                }
2027            ],
2028            "pagination": {
2029                "page": 0,
2030                "page_size": 20,
2031                "total": 10
2032            }
2033        }
2034        "#;
2035
2036    #[rstest]
2037    #[case::string_input(vec![
2038        "id1".to_string(),
2039        "id2".to_string()
2040    ])]
2041    #[tokio::test]
2042    async fn test_get_protocol_states_paginated<T>(#[case] ids: Vec<T>)
2043    where
2044        T: AsRef<str> + Clone + Send + Sync + 'static,
2045    {
2046        let mock_client = MockRPCClient::new();
2047
2048        let request_args = mock_client
2049            .test_get_protocol_states_paginated(
2050                Chain::Ethereum,
2051                &ids,
2052                "test_system",
2053                true,
2054                None,
2055                2,
2056                2,
2057            )
2058            .await;
2059
2060        // Verify that the request args have been split into chunks correctly
2061        assert_eq!(request_args.len(), 1);
2062        assert_eq!(request_args[0].1.len(), 2);
2063    }
2064
2065    #[tokio::test]
2066    async fn test_get_contract_state() {
2067        let mut server = Server::new_async().await;
2068        let server_resp = GET_CONTRACT_STATE_RESP;
2069        // test that the response is deserialized correctly
2070        serde_json::from_str::<StateRequestResponse>(server_resp).expect("deserialize");
2071
2072        let mocked_server = server
2073            .mock("POST", "/v1/contract_state")
2074            .expect(1)
2075            .with_body(server_resp)
2076            .create_async()
2077            .await;
2078
2079        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2080            .expect("create client");
2081
2082        let accounts = client
2083            .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
2084            .await
2085            .expect("get state");
2086
2087        mocked_server.assert();
2088        assert_eq!(accounts.data().len(), 1);
2089        assert_eq!(accounts.data()[0].slots, HashMap::new());
2090        assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
2091        assert_eq!(accounts.data()[0].code, [0].to_vec());
2092        assert_eq!(
2093            accounts.data()[0].code_hash,
2094            hex::decode("5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e")
2095                .unwrap()
2096        );
2097    }
2098
2099    #[tokio::test]
2100    async fn test_get_protocol_components() {
2101        let mut server = Server::new_async().await;
2102        let server_resp = r#"
2103        {
2104            "protocol_components": [
2105                {
2106                    "id": "State1",
2107                    "protocol_system": "ambient",
2108                    "protocol_type_name": "Pool",
2109                    "chain": "ethereum",
2110                    "tokens": [
2111                        "0x0000000000000000000000000000000000000000",
2112                        "0x0000000000000000000000000000000000000001"
2113                    ],
2114                    "contract_ids": [
2115                        "0x0000000000000000000000000000000000000000"
2116                    ],
2117                    "static_attributes": {
2118                        "attribute_1": "0x00000000000003e8"
2119                    },
2120                    "change": "Creation",
2121                    "creation_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
2122                    "created_at": "2022-01-01T00:00:00"
2123                }
2124            ],
2125            "pagination": {
2126                "page": 0,
2127                "page_size": 20,
2128                "total": 10
2129            }
2130        }
2131        "#;
2132        // test that the response is deserialized correctly
2133        serde_json::from_str::<ProtocolComponentRequestResponse>(server_resp).expect("deserialize");
2134
2135        let mocked_server = server
2136            .mock("POST", "/v1/protocol_components")
2137            .expect(1)
2138            .with_body(server_resp)
2139            .create_async()
2140            .await;
2141
2142        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2143            .expect("create client");
2144
2145        let components = client
2146            .get_protocol_components(ProtocolComponentsParams::new(Chain::Ethereum, ""))
2147            .await
2148            .expect("get state");
2149
2150        mocked_server.assert();
2151        assert_eq!(components.data().len(), 1);
2152        assert_eq!(components.data()[0].id, "State1");
2153        assert_eq!(components.data()[0].protocol_system, "ambient");
2154        assert_eq!(components.data()[0].protocol_type_name, "Pool");
2155        assert_eq!(components.data()[0].tokens.len(), 2);
2156        let expected_attributes =
2157            [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
2158                .iter()
2159                .cloned()
2160                .collect::<HashMap<String, Bytes>>();
2161        assert_eq!(components.data()[0].static_attributes, expected_attributes);
2162    }
2163
2164    #[tokio::test]
2165    async fn test_get_protocol_states() {
2166        let mut server = Server::new_async().await;
2167        let server_resp = r#"
2168        {
2169            "states": [
2170                {
2171                    "component_id": "State1",
2172                    "attributes": {
2173                        "attribute_1": "0x00000000000003e8"
2174                    },
2175                    "balances": {
2176                        "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
2177                    }
2178                }
2179            ],
2180            "pagination": {
2181                "page": 0,
2182                "page_size": 20,
2183                "total": 10
2184            }
2185        }
2186        "#;
2187        // test that the response is deserialized correctly
2188        serde_json::from_str::<ProtocolStateRequestResponse>(server_resp).expect("deserialize");
2189
2190        let mocked_server = server
2191            .mock("POST", "/v1/protocol_state")
2192            .expect(1)
2193            .with_body(server_resp)
2194            .create_async()
2195            .await;
2196        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2197            .expect("create client");
2198
2199        let states = client
2200            .get_protocol_states(
2201                ProtocolStatesParams::new(Chain::Ethereum, "").with_include_balances(true),
2202            )
2203            .await
2204            .expect("get state");
2205
2206        mocked_server.assert();
2207        assert_eq!(states.data().len(), 1);
2208        assert_eq!(states.data()[0].component_id, "State1");
2209        let expected_attributes =
2210            [("attribute_1".to_string(), Bytes::from(1000_u64.to_be_bytes()))]
2211                .iter()
2212                .cloned()
2213                .collect::<HashMap<String, Bytes>>();
2214        assert_eq!(states.data()[0].attributes, expected_attributes);
2215        let expected_balances = [(
2216            Bytes::from_str("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")
2217                .expect("Unsupported address format"),
2218            Bytes::from_str("0x01f4").unwrap(),
2219        )]
2220        .iter()
2221        .cloned()
2222        .collect::<HashMap<Bytes, Bytes>>();
2223        assert_eq!(states.data()[0].balances, expected_balances);
2224    }
2225
2226    #[tokio::test]
2227    async fn test_get_tokens() {
2228        let mut server = Server::new_async().await;
2229        let server_resp = r#"
2230        {
2231            "tokens": [
2232              {
2233                "chain": "ethereum",
2234                "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
2235                "symbol": "WETH",
2236                "decimals": 18,
2237                "tax": 0,
2238                "gas": [
2239                  29962
2240                ],
2241                "quality": 100
2242              },
2243              {
2244                "chain": "ethereum",
2245                "address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
2246                "symbol": "USDC",
2247                "decimals": 6,
2248                "tax": 0,
2249                "gas": [
2250                  40652
2251                ],
2252                "quality": 100
2253              }
2254            ],
2255            "pagination": {
2256              "page": 0,
2257              "page_size": 20,
2258              "total": 10
2259            }
2260          }
2261        "#;
2262        // test that the response is deserialized correctly
2263        serde_json::from_str::<TokensRequestResponse>(server_resp).expect("deserialize");
2264
2265        let mocked_server = server
2266            .mock("POST", "/v1/tokens")
2267            .expect(1)
2268            .with_body(server_resp)
2269            .create_async()
2270            .await;
2271        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2272            .expect("create client");
2273
2274        let tokens = client
2275            .get_tokens(TokensParams::new(Chain::Ethereum))
2276            .await
2277            .expect("get tokens");
2278
2279        let expected = vec![
2280            Token {
2281                chain: tycho_common::models::Chain::Ethereum,
2282                address: Bytes::from_str("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2").unwrap(),
2283                symbol: "WETH".to_string(),
2284                decimals: 18,
2285                tax: 0,
2286                gas: vec![Some(29962)],
2287                quality: 100,
2288            },
2289            Token {
2290                chain: tycho_common::models::Chain::Ethereum,
2291                address: Bytes::from_str("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48").unwrap(),
2292                symbol: "USDC".to_string(),
2293                decimals: 6,
2294                tax: 0,
2295                gas: vec![Some(40652)],
2296                quality: 100,
2297            },
2298        ];
2299
2300        mocked_server.assert();
2301        assert_eq!(*tokens.data(), expected);
2302    }
2303
2304    #[rstest]
2305    #[case::with_dci(Some(vec!["system2"]), vec!["system2"])]
2306    #[case::backward_compat(None, vec![])]
2307    #[tokio::test]
2308    async fn test_get_protocol_systems(
2309        #[case] dci_protocols: Option<Vec<&str>>,
2310        #[case] expected_dci: Vec<&str>,
2311    ) {
2312        use serde_json::json;
2313
2314        let mut json_value = json!({
2315            "protocol_systems": ["system1", "system2"],
2316            "pagination": { "page": 0, "page_size": 20, "total": 2 }
2317        });
2318        if let Some(dci) = dci_protocols {
2319            json_value["dci_protocols"] = json!(dci);
2320        }
2321        let server_resp = serde_json::to_string(&json_value).unwrap();
2322
2323        let mut server = Server::new_async().await;
2324        let mocked_server = server
2325            .mock("POST", "/v1/protocol_systems")
2326            .expect(1)
2327            .with_body(&server_resp)
2328            .create_async()
2329            .await;
2330        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2331            .expect("create client");
2332
2333        let response = client
2334            .get_protocol_systems(ProtocolSystemsParams::new(Chain::Ethereum))
2335            .await
2336            .expect("get protocol systems");
2337
2338        mocked_server.assert();
2339        assert_eq!(response.data().protocol_systems(), ["system1", "system2"]);
2340        assert_eq!(response.data().dci_protocols(), expected_dci.as_slice());
2341    }
2342
2343    #[tokio::test]
2344    async fn test_get_component_tvl() {
2345        let mut server = Server::new_async().await;
2346        let server_resp = r#"
2347        {
2348            "tvl": {
2349                "component1": 100.0
2350            },
2351            "pagination": {
2352                "page": 0,
2353                "page_size": 20,
2354                "total": 10
2355            }
2356        }
2357        "#;
2358        // test that the response is deserialized correctly
2359        serde_json::from_str::<ComponentTvlRequestResponse>(server_resp).expect("deserialize");
2360
2361        let mocked_server = server
2362            .mock("POST", "/v1/component_tvl")
2363            .expect(1)
2364            .with_body(server_resp)
2365            .create_async()
2366            .await;
2367        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2368            .expect("create client");
2369
2370        let component_tvl = client
2371            .get_component_tvl(ComponentTvlParams::new(Chain::Ethereum))
2372            .await
2373            .expect("get component tvl");
2374
2375        mocked_server.assert();
2376        assert_eq!(component_tvl.data().get("component1"), Some(&100.0));
2377    }
2378
2379    #[tokio::test]
2380    async fn test_get_traced_entry_points() {
2381        let mut server = Server::new_async().await;
2382        let server_resp = r#"
2383        {
2384            "traced_entry_points": {
2385                "component_1": [
2386                    [
2387                        {
2388                            "entry_point": {
2389                                "external_id": "entrypoint_a",
2390                                "target": "0x0000000000000000000000000000000000000001",
2391                                "signature": "sig()"
2392                            },
2393                            "params": {
2394                                "method": "rpctracer",
2395                                "caller": "0x000000000000000000000000000000000000000a",
2396                                "calldata": "0x000000000000000000000000000000000000000b"
2397                            }
2398                        },
2399                        {
2400                            "retriggers": [
2401                                [
2402                                    "0x00000000000000000000000000000000000000aa",
2403                                    {"key": "0x0000000000000000000000000000000000000aaa", "offset": 12}
2404                                ]
2405                            ],
2406                            "accessed_slots": {
2407                                "0x0000000000000000000000000000000000aaaa": [
2408                                    "0x0000000000000000000000000000000000aaaa"
2409                                ]
2410                            }
2411                        }
2412                    ]
2413                ]
2414            },
2415            "pagination": {
2416                "page": 0,
2417                "page_size": 20,
2418                "total": 1
2419            }
2420        }
2421        "#;
2422        // test that the response is deserialized correctly
2423        serde_json::from_str::<TracedEntryPointRequestResponse>(server_resp).expect("deserialize");
2424
2425        let mocked_server = server
2426            .mock("POST", "/v1/traced_entry_points")
2427            .expect(1)
2428            .with_body(server_resp)
2429            .create_async()
2430            .await;
2431        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2432            .expect("create client");
2433
2434        let entrypoints = client
2435            .get_traced_entry_points(TracedEntryPointsParams::new(Chain::Ethereum, ""))
2436            .await
2437            .expect("get traced entry points");
2438
2439        mocked_server.assert();
2440        assert_eq!(entrypoints.data().len(), 1);
2441        let comp1_entrypoints = entrypoints
2442            .data()
2443            .get("component_1")
2444            .expect("component_1 entrypoints should exist");
2445        assert_eq!(comp1_entrypoints.len(), 1);
2446
2447        let (entrypoint, trace_result) = &comp1_entrypoints[0];
2448        assert_eq!(entrypoint.entry_point.external_id, "entrypoint_a");
2449        assert_eq!(
2450            entrypoint.entry_point.target,
2451            Bytes::from_str("0x0000000000000000000000000000000000000001").unwrap()
2452        );
2453        assert_eq!(entrypoint.entry_point.signature, "sig()");
2454        let tycho_common::models::blockchain::TracingParams::RPCTracer(rpc_params) =
2455            &entrypoint.params;
2456        assert_eq!(
2457            rpc_params.caller,
2458            Some(Bytes::from("0x000000000000000000000000000000000000000a"))
2459        );
2460        assert_eq!(rpc_params.calldata, Bytes::from("0x000000000000000000000000000000000000000b"));
2461
2462        assert_eq!(
2463            trace_result.retriggers,
2464            HashSet::from([(
2465                Bytes::from("0x00000000000000000000000000000000000000aa"),
2466                AddressStorageLocation::new(
2467                    Bytes::from("0x0000000000000000000000000000000000000aaa"),
2468                    12
2469                )
2470            )])
2471        );
2472        assert_eq!(trace_result.accessed_slots.len(), 1);
2473        assert_eq!(
2474            trace_result.accessed_slots,
2475            HashMap::from([(
2476                Bytes::from("0x0000000000000000000000000000000000aaaa"),
2477                HashSet::from([Bytes::from("0x0000000000000000000000000000000000aaaa")])
2478            )])
2479        );
2480    }
2481
2482    #[tokio::test]
2483    async fn test_parse_retry_value_numeric() {
2484        let result = parse_retry_value("60");
2485        assert!(result.is_some());
2486
2487        let expected_time = SystemTime::now() + Duration::from_secs(60);
2488        let actual_time = result.unwrap();
2489
2490        // Allow for small timing differences during test execution
2491        let diff = if actual_time > expected_time {
2492            actual_time
2493                .duration_since(expected_time)
2494                .unwrap()
2495        } else {
2496            expected_time
2497                .duration_since(actual_time)
2498                .unwrap()
2499        };
2500        assert!(diff < Duration::from_secs(1), "Time difference too large: {:?}", diff);
2501    }
2502
2503    #[tokio::test]
2504    async fn test_parse_retry_value_rfc2822() {
2505        // Use a fixed future date in RFC2822 format
2506        let rfc2822_date = "Sat, 01 Jan 2030 12:00:00 +0000";
2507        let result = parse_retry_value(rfc2822_date);
2508        assert!(result.is_some());
2509
2510        let parsed_time = result.unwrap();
2511        assert!(parsed_time > SystemTime::now());
2512    }
2513
2514    #[tokio::test]
2515    async fn test_parse_retry_value_invalid_formats() {
2516        // Test various invalid formats
2517        assert!(parse_retry_value("invalid").is_none());
2518        assert!(parse_retry_value("").is_none());
2519        assert!(parse_retry_value("not_a_number").is_none());
2520        assert!(parse_retry_value("Mon, 32 Jan 2030 25:00:00 +0000").is_none());
2521        // Invalid date
2522    }
2523
2524    #[tokio::test]
2525    async fn test_parse_retry_value_zero_seconds() {
2526        let result = parse_retry_value("0");
2527        assert!(result.is_some());
2528
2529        let expected_time = SystemTime::now();
2530        let actual_time = result.unwrap();
2531
2532        // Should be very close to current time
2533        let diff = if actual_time > expected_time {
2534            actual_time
2535                .duration_since(expected_time)
2536                .unwrap()
2537        } else {
2538            expected_time
2539                .duration_since(actual_time)
2540                .unwrap()
2541        };
2542        assert!(diff < Duration::from_secs(1));
2543    }
2544
2545    #[tokio::test]
2546    async fn test_error_for_response_rate_limited() {
2547        let mut server = Server::new_async().await;
2548        let mock = server
2549            .mock("GET", "/test")
2550            .with_status(429)
2551            .with_header("Retry-After", "60")
2552            .create_async()
2553            .await;
2554
2555        let client = reqwest::Client::new();
2556        let response = client
2557            .get(format!("{}/test", server.url()))
2558            .send()
2559            .await
2560            .unwrap();
2561
2562        let http_client =
2563            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2564                .unwrap()
2565                .with_test_backoff_policy();
2566        let result = http_client
2567            .error_for_response(response)
2568            .await;
2569
2570        mock.assert();
2571        assert!(matches!(result, Err(RPCError::RateLimited(_))));
2572        if let Err(RPCError::RateLimited(retry_after)) = result {
2573            assert!(retry_after.is_some());
2574        }
2575    }
2576
2577    #[tokio::test]
2578    async fn test_error_for_response_rate_limited_no_header() {
2579        let mut server = Server::new_async().await;
2580        let mock = server
2581            .mock("GET", "/test")
2582            .with_status(429)
2583            .create_async()
2584            .await;
2585
2586        let client = reqwest::Client::new();
2587        let response = client
2588            .get(format!("{}/test", server.url()))
2589            .send()
2590            .await
2591            .unwrap();
2592
2593        let http_client =
2594            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2595                .unwrap()
2596                .with_test_backoff_policy();
2597        let result = http_client
2598            .error_for_response(response)
2599            .await;
2600
2601        mock.assert();
2602        assert!(matches!(result, Err(RPCError::RateLimited(None))));
2603    }
2604
2605    #[tokio::test]
2606    async fn test_error_for_response_server_errors() {
2607        let test_cases =
2608            vec![(502, "Bad Gateway"), (503, "Service Unavailable"), (504, "Gateway Timeout")];
2609
2610        for (status_code, expected_body) in test_cases {
2611            let mut server = Server::new_async().await;
2612            let mock = server
2613                .mock("GET", "/test")
2614                .with_status(status_code)
2615                .with_body(expected_body)
2616                .create_async()
2617                .await;
2618
2619            let client = reqwest::Client::new();
2620            let response = client
2621                .get(format!("{}/test", server.url()))
2622                .send()
2623                .await
2624                .unwrap();
2625
2626            let http_client =
2627                HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2628                    .unwrap()
2629                    .with_test_backoff_policy();
2630            let result = http_client
2631                .error_for_response(response)
2632                .await;
2633
2634            mock.assert();
2635            assert!(matches!(result, Err(RPCError::ServerUnreachable(_))));
2636            if let Err(RPCError::ServerUnreachable(body)) = result {
2637                assert_eq!(body, expected_body);
2638            }
2639        }
2640    }
2641
2642    #[tokio::test]
2643    async fn test_error_for_response_success() {
2644        let mut server = Server::new_async().await;
2645        let mock = server
2646            .mock("GET", "/test")
2647            .with_status(200)
2648            .with_body("success")
2649            .create_async()
2650            .await;
2651
2652        let client = reqwest::Client::new();
2653        let response = client
2654            .get(format!("{}/test", server.url()))
2655            .send()
2656            .await
2657            .unwrap();
2658
2659        let http_client =
2660            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2661                .unwrap()
2662                .with_test_backoff_policy();
2663        let result = http_client
2664            .error_for_response(response)
2665            .await;
2666
2667        mock.assert();
2668        assert!(result.is_ok());
2669
2670        let response = result.unwrap();
2671        assert_eq!(response.status(), 200);
2672    }
2673
2674    #[tokio::test]
2675    async fn test_handle_error_for_backoff_server_unreachable() {
2676        let http_client =
2677            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2678                .unwrap()
2679                .with_test_backoff_policy();
2680        let error = RPCError::ServerUnreachable("Service down".to_string());
2681
2682        let backoff_error = http_client
2683            .handle_error_for_backoff(error)
2684            .await;
2685
2686        match backoff_error {
2687            backoff::Error::Transient { err: RPCError::ServerUnreachable(msg), retry_after } => {
2688                assert_eq!(msg, "Service down");
2689                assert_eq!(retry_after, Some(Duration::from_millis(50))); // Fast test duration
2690            }
2691            _ => panic!("Expected transient error for ServerUnreachable"),
2692        }
2693    }
2694
2695    #[tokio::test]
2696    async fn test_handle_error_for_backoff_rate_limited_with_retry_after() {
2697        let http_client =
2698            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2699                .unwrap()
2700                .with_test_backoff_policy();
2701        let future_time = SystemTime::now() + Duration::from_secs(30);
2702        let error = RPCError::RateLimited(Some(future_time));
2703
2704        let backoff_error = http_client
2705            .handle_error_for_backoff(error)
2706            .await;
2707
2708        match backoff_error {
2709            backoff::Error::Transient { err: RPCError::RateLimited(retry_after), .. } => {
2710                assert_eq!(retry_after, Some(future_time));
2711            }
2712            _ => panic!("Expected transient error for RateLimited"),
2713        }
2714
2715        // Verify that retry_after was stored in the client state
2716        let stored_retry_after = http_client.retry_after.read().await;
2717        assert_eq!(*stored_retry_after, Some(future_time));
2718    }
2719
2720    #[tokio::test]
2721    async fn test_handle_error_for_backoff_rate_limited_no_retry_after() {
2722        let http_client =
2723            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2724                .unwrap()
2725                .with_test_backoff_policy();
2726        let error = RPCError::RateLimited(None);
2727
2728        let backoff_error = http_client
2729            .handle_error_for_backoff(error)
2730            .await;
2731
2732        match backoff_error {
2733            backoff::Error::Transient { err: RPCError::RateLimited(None), .. } => {
2734                // This is expected - no retry-after still allows retries with default policy
2735            }
2736            _ => panic!("Expected transient error for RateLimited without retry-after"),
2737        }
2738    }
2739
2740    #[tokio::test]
2741    async fn test_handle_error_for_backoff_other_errors() {
2742        let http_client =
2743            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2744                .unwrap()
2745                .with_test_backoff_policy();
2746        let error = RPCError::ParseResponse("Invalid JSON".to_string());
2747
2748        let backoff_error = http_client
2749            .handle_error_for_backoff(error)
2750            .await;
2751
2752        match backoff_error {
2753            backoff::Error::Permanent(RPCError::ParseResponse(msg)) => {
2754                assert_eq!(msg, "Invalid JSON");
2755            }
2756            _ => panic!("Expected permanent error for ParseResponse"),
2757        }
2758    }
2759
2760    #[tokio::test]
2761    async fn test_wait_until_retry_after_no_retry_time() {
2762        let http_client =
2763            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2764                .unwrap()
2765                .with_test_backoff_policy();
2766
2767        let start = std::time::Instant::now();
2768        http_client
2769            .wait_until_retry_after()
2770            .await;
2771        let elapsed = start.elapsed();
2772
2773        // Should return immediately if no retry time is set
2774        assert!(elapsed < Duration::from_millis(100));
2775    }
2776
2777    #[tokio::test]
2778    async fn test_wait_until_retry_after_past_time() {
2779        let http_client =
2780            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2781                .unwrap()
2782                .with_test_backoff_policy();
2783
2784        // Set a retry time in the past
2785        let past_time = SystemTime::now() - Duration::from_secs(10);
2786        *http_client.retry_after.write().await = Some(past_time);
2787
2788        let start = std::time::Instant::now();
2789        http_client
2790            .wait_until_retry_after()
2791            .await;
2792        let elapsed = start.elapsed();
2793
2794        // Should return immediately if retry time is in the past
2795        assert!(elapsed < Duration::from_millis(100));
2796    }
2797
2798    #[tokio::test]
2799    async fn test_wait_until_retry_after_future_time() {
2800        let http_client =
2801            HttpRPCClient::new("http://localhost:8080", HttpRPCClientOptions::default())
2802                .unwrap()
2803                .with_test_backoff_policy();
2804
2805        // Set a retry time 100ms in the future
2806        let future_time = SystemTime::now() + Duration::from_millis(100);
2807        *http_client.retry_after.write().await = Some(future_time);
2808
2809        let start = std::time::Instant::now();
2810        http_client
2811            .wait_until_retry_after()
2812            .await;
2813        let elapsed = start.elapsed();
2814
2815        // Should wait approximately the specified duration
2816        assert!(elapsed >= Duration::from_millis(80)); // Allow some tolerance
2817        assert!(elapsed <= Duration::from_millis(200)); // Upper bound for test stability
2818    }
2819
2820    #[tokio::test]
2821    async fn test_make_post_request_success() {
2822        let mut server = Server::new_async().await;
2823        let server_resp = r#"{"success": true}"#;
2824
2825        let mock = server
2826            .mock("POST", "/test")
2827            .with_status(200)
2828            .with_body(server_resp)
2829            .create_async()
2830            .await;
2831
2832        let http_client =
2833            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2834                .unwrap()
2835                .with_test_backoff_policy();
2836        let request_body = serde_json::json!({"test": "data"});
2837        let uri = format!("{}/test", server.url());
2838
2839        let result = http_client
2840            .make_post_request(&request_body, &uri)
2841            .await;
2842
2843        mock.assert();
2844        assert!(result.is_ok());
2845
2846        let response = result.unwrap();
2847        assert_eq!(response.status(), 200);
2848        assert_eq!(response.text().await.unwrap(), server_resp);
2849    }
2850
2851    #[tokio::test]
2852    async fn test_make_post_request_retry_on_server_error() {
2853        let mut server = Server::new_async().await;
2854        // First request fails with 503, second succeeds
2855        let error_mock = server
2856            .mock("POST", "/test")
2857            .with_status(503)
2858            .with_body("Service Unavailable")
2859            .expect(1)
2860            .create_async()
2861            .await;
2862
2863        let success_mock = server
2864            .mock("POST", "/test")
2865            .with_status(200)
2866            .with_body(r#"{"success": true}"#)
2867            .expect(1)
2868            .create_async()
2869            .await;
2870
2871        let http_client =
2872            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2873                .unwrap()
2874                .with_test_backoff_policy();
2875        let request_body = serde_json::json!({"test": "data"});
2876        let uri = format!("{}/test", server.url());
2877
2878        let result = http_client
2879            .make_post_request(&request_body, &uri)
2880            .await;
2881
2882        error_mock.assert();
2883        success_mock.assert();
2884        assert!(result.is_ok());
2885    }
2886
2887    #[tokio::test]
2888    async fn test_make_post_request_respect_retry_after_header() {
2889        let mut server = Server::new_async().await;
2890
2891        // First request returns 429 with retry-after, second succeeds
2892        let rate_limit_mock = server
2893            .mock("POST", "/test")
2894            .with_status(429)
2895            .with_header("Retry-After", "1") // 1 second
2896            .expect(1)
2897            .create_async()
2898            .await;
2899
2900        let success_mock = server
2901            .mock("POST", "/test")
2902            .with_status(200)
2903            .with_body(r#"{"success": true}"#)
2904            .expect(1)
2905            .create_async()
2906            .await;
2907
2908        let http_client =
2909            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2910                .unwrap()
2911                .with_test_backoff_policy();
2912        let request_body = serde_json::json!({"test": "data"});
2913        let uri = format!("{}/test", server.url());
2914
2915        let start = std::time::Instant::now();
2916        let result = http_client
2917            .make_post_request(&request_body, &uri)
2918            .await;
2919        let elapsed = start.elapsed();
2920
2921        rate_limit_mock.assert();
2922        success_mock.assert();
2923        assert!(result.is_ok());
2924
2925        // Should have waited at least 1 second due to retry-after header
2926        assert!(elapsed >= Duration::from_millis(900)); // Allow some tolerance
2927        assert!(elapsed <= Duration::from_millis(2000)); // Upper bound for test stability
2928    }
2929
2930    #[tokio::test]
2931    async fn test_make_post_request_permanent_error() {
2932        let mut server = Server::new_async().await;
2933
2934        let mock = server
2935            .mock("POST", "/test")
2936            .with_status(400) // Bad Request - should not be retried
2937            .with_body("Bad Request")
2938            .expect(1)
2939            .create_async()
2940            .await;
2941
2942        let http_client =
2943            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
2944                .unwrap()
2945                .with_test_backoff_policy();
2946        let request_body = serde_json::json!({"test": "data"});
2947        let uri = format!("{}/test", server.url());
2948
2949        let result = http_client
2950            .make_post_request(&request_body, &uri)
2951            .await;
2952
2953        mock.assert();
2954        assert!(result.is_ok()); // 400 doesn't trigger retry logic, just returns the response
2955
2956        let response = result.unwrap();
2957        assert_eq!(response.status(), 400);
2958    }
2959
2960    #[tokio::test]
2961    async fn test_concurrent_requests_with_different_retry_after() {
2962        let mut server = Server::new_async().await;
2963
2964        // First request gets rate limited with 1 second retry-after
2965        let rate_limit_mock_1 = server
2966            .mock("POST", "/test1")
2967            .with_status(429)
2968            .with_header("Retry-After", "1")
2969            .expect(1)
2970            .create_async()
2971            .await;
2972
2973        // Second request gets rate limited with 2 second retry-after
2974        let rate_limit_mock_2 = server
2975            .mock("POST", "/test2")
2976            .with_status(429)
2977            .with_header("Retry-After", "2")
2978            .expect(1)
2979            .create_async()
2980            .await;
2981
2982        // Success mocks for retries
2983        let success_mock_1 = server
2984            .mock("POST", "/test1")
2985            .with_status(200)
2986            .with_body(r#"{"result": "success1"}"#)
2987            .expect(1)
2988            .create_async()
2989            .await;
2990
2991        let success_mock_2 = server
2992            .mock("POST", "/test2")
2993            .with_status(200)
2994            .with_body(r#"{"result": "success2"}"#)
2995            .expect(1)
2996            .create_async()
2997            .await;
2998
2999        let http_client =
3000            HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3001                .unwrap()
3002                .with_test_backoff_policy();
3003        let request_body = serde_json::json!({"test": "data"});
3004
3005        let uri1 = format!("{}/test1", server.url());
3006        let uri2 = format!("{}/test2", server.url());
3007
3008        // Start both requests concurrently
3009        let start = std::time::Instant::now();
3010        let (result1, result2) = tokio::join!(
3011            http_client.make_post_request(&request_body, &uri1),
3012            http_client.make_post_request(&request_body, &uri2)
3013        );
3014        let elapsed = start.elapsed();
3015
3016        rate_limit_mock_1.assert();
3017        rate_limit_mock_2.assert();
3018        success_mock_1.assert();
3019        success_mock_2.assert();
3020
3021        assert!(result1.is_ok());
3022        assert!(result2.is_ok());
3023
3024        // Both requests should succeed, but the second should take longer due to the 2s retry-after
3025        // The total time should be at least 2 seconds since the shared retry_after state
3026        // gets updated by both requests
3027        assert!(elapsed >= Duration::from_millis(1800)); // Allow some tolerance
3028        assert!(elapsed <= Duration::from_millis(3000)); // Upper bound
3029
3030        // Check the final retry_after state - should be the latest (higher) value
3031        let final_retry_after = http_client.retry_after.read().await;
3032        assert!(final_retry_after.is_some());
3033
3034        // The retry_after should be set to the latest (higher) value from the two requests
3035        if let Some(retry_time) = *final_retry_after {
3036            // The retry_after time might be in the past now since we waited,
3037            // but it should be reasonable (not too far in past/future)
3038            let now = SystemTime::now();
3039            let diff = if retry_time > now {
3040                retry_time.duration_since(now).unwrap()
3041            } else {
3042                now.duration_since(retry_time).unwrap()
3043            };
3044
3045            // Should be within a reasonable range (the 2s retry-after plus some buffer)
3046            assert!(diff <= Duration::from_secs(3), "Retry time difference too large: {:?}", diff);
3047        }
3048    }
3049
3050    #[tokio::test]
3051    async fn test_get_snapshots() {
3052        let mut server = Server::new_async().await;
3053
3054        // Mock protocol states response
3055        let protocol_states_resp = r#"
3056        {
3057            "states": [
3058                {
3059                    "component_id": "component1",
3060                    "attributes": {
3061                        "attribute_1": "0x00000000000003e8"
3062                    },
3063                    "balances": {
3064                        "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x01f4"
3065                    }
3066                }
3067            ],
3068            "pagination": {
3069                "page": 0,
3070                "page_size": 100,
3071                "total": 1
3072            }
3073        }
3074        "#;
3075
3076        // Mock contract state response
3077        let contract_state_resp = r#"
3078        {
3079            "accounts": [
3080                {
3081                    "chain": "ethereum",
3082                    "address": "0x1111111111111111111111111111111111111111",
3083                    "title": "",
3084                    "slots": {},
3085                    "native_balance": "0x01f4",
3086                    "token_balances": {},
3087                    "code": "0x00",
3088                    "code_hash": "0x5c06b7c5b3d910fd33bc2229846f9ddaf91d584d9b196e16636901ac3a77077e",
3089                    "balance_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
3090                    "code_modify_tx": "0x0000000000000000000000000000000000000000000000000000000000000000",
3091                    "creation_tx": null
3092                }
3093            ],
3094            "pagination": {
3095                "page": 0,
3096                "page_size": 100,
3097                "total": 1
3098            }
3099        }
3100        "#;
3101
3102        // Mock component TVL response
3103        let tvl_resp = r#"
3104        {
3105            "tvl": {
3106                "component1": 1000000.0
3107            },
3108            "pagination": {
3109                "page": 0,
3110                "page_size": 100,
3111                "total": 1
3112            }
3113        }
3114        "#;
3115
3116        let protocol_states_mock = server
3117            .mock("POST", "/v1/protocol_state")
3118            .expect(1)
3119            .with_body(protocol_states_resp)
3120            .create_async()
3121            .await;
3122
3123        let contract_state_mock = server
3124            .mock("POST", "/v1/contract_state")
3125            .expect(1)
3126            .with_body(contract_state_resp)
3127            .create_async()
3128            .await;
3129
3130        let tvl_mock = server
3131            .mock("POST", "/v1/component_tvl")
3132            .expect(1)
3133            .with_body(tvl_resp)
3134            .create_async()
3135            .await;
3136
3137        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3138            .expect("create client");
3139
3140        let component = tycho_common::models::protocol::ProtocolComponent {
3141            id: "component1".to_string(),
3142            protocol_system: "test_protocol".to_string(),
3143            protocol_type_name: "test_type".to_string(),
3144            chain: Chain::Ethereum,
3145            tokens: vec![],
3146            contract_addresses: vec![
3147                Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()
3148            ],
3149            static_attributes: HashMap::new(),
3150            change: tycho_common::models::ChangeType::Creation,
3151            creation_tx: Bytes::from_str(
3152                "0x0000000000000000000000000000000000000000000000000000000000000000",
3153            )
3154            .unwrap(),
3155            created_at: chrono::Utc::now().naive_utc(),
3156        };
3157
3158        let mut components = HashMap::new();
3159        components.insert("component1".to_string(), component);
3160
3161        let contract_ids =
3162            vec![Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap()];
3163
3164        let request = SnapshotParameters::new(
3165            Chain::Ethereum,
3166            "test_protocol",
3167            &components,
3168            &contract_ids,
3169            12345,
3170        );
3171
3172        let response = client
3173            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3174            .await
3175            .expect("get snapshots");
3176
3177        // Verify all mocks were called
3178        protocol_states_mock.assert();
3179        contract_state_mock.assert();
3180        tvl_mock.assert();
3181
3182        // Assert states
3183        assert_eq!(response.states.len(), 1);
3184        assert!(response
3185            .states
3186            .contains_key("component1"));
3187
3188        // Check that the state has the expected TVL
3189        let component_state = response
3190            .states
3191            .get("component1")
3192            .unwrap();
3193        assert_eq!(component_state.component_tvl, Some(1000000.0));
3194
3195        // Assert VM storage
3196        assert_eq!(response.vm_storage.len(), 1);
3197        let contract_addr = Bytes::from_str("0x1111111111111111111111111111111111111111").unwrap();
3198        assert!(response
3199            .vm_storage
3200            .contains_key(&contract_addr));
3201    }
3202
3203    #[tokio::test]
3204    async fn test_get_snapshots_empty_components() {
3205        let server = Server::new_async().await;
3206        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3207            .expect("create client");
3208
3209        let components = HashMap::new();
3210        let contract_ids = vec![];
3211
3212        let request = SnapshotParameters::new(
3213            Chain::Ethereum,
3214            "test_protocol",
3215            &components,
3216            &contract_ids,
3217            12345,
3218        );
3219
3220        let response = client
3221            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3222            .await
3223            .expect("get snapshots");
3224
3225        // Should return empty response without making any requests
3226        assert!(response.states.is_empty());
3227        assert!(response.vm_storage.is_empty());
3228    }
3229
3230    #[tokio::test]
3231    async fn test_get_snapshots_without_tvl() {
3232        let mut server = Server::new_async().await;
3233
3234        let protocol_states_resp = r#"
3235        {
3236            "states": [
3237                {
3238                    "component_id": "component1",
3239                    "attributes": {},
3240                    "balances": {}
3241                }
3242            ],
3243            "pagination": {
3244                "page": 0,
3245                "page_size": 100,
3246                "total": 1
3247            }
3248        }
3249        "#;
3250
3251        let protocol_states_mock = server
3252            .mock("POST", "/v1/protocol_state")
3253            .expect(1)
3254            .with_body(protocol_states_resp)
3255            .create_async()
3256            .await;
3257
3258        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3259            .expect("create client");
3260
3261        // Create test component
3262        let component = tycho_common::models::protocol::ProtocolComponent {
3263            id: "component1".to_string(),
3264            protocol_system: "test_protocol".to_string(),
3265            protocol_type_name: "test_type".to_string(),
3266            chain: Chain::Ethereum,
3267            tokens: vec![],
3268            contract_addresses: vec![],
3269            static_attributes: HashMap::new(),
3270            change: tycho_common::models::ChangeType::Creation,
3271            creation_tx: Bytes::from_str(
3272                "0x0000000000000000000000000000000000000000000000000000000000000000",
3273            )
3274            .unwrap(),
3275            created_at: chrono::Utc::now().naive_utc(),
3276        };
3277
3278        let mut components = HashMap::new();
3279        components.insert("component1".to_string(), component);
3280        let contract_ids = vec![];
3281
3282        let request = SnapshotParameters::new(
3283            Chain::Ethereum,
3284            "test_protocol",
3285            &components,
3286            &contract_ids,
3287            12345,
3288        )
3289        .include_balances(false)
3290        .include_tvl(false);
3291
3292        let response = client
3293            .get_snapshots(&request, None, RPC_CLIENT_CONCURRENCY)
3294            .await
3295            .expect("get snapshots");
3296
3297        // Verify only necessary mocks were called
3298        protocol_states_mock.assert();
3299        // No contract_state_mock.assert() since contract_ids is empty
3300        // No tvl_mock.assert() since include_tvl is false
3301
3302        assert_eq!(response.states.len(), 1);
3303        // Check that TVL is None since we didn't request it
3304        let component_state = response
3305            .states
3306            .get("component1")
3307            .unwrap();
3308        assert_eq!(component_state.component_tvl, None);
3309    }
3310
3311    #[tokio::test]
3312    async fn test_compression_enabled() {
3313        let mut server = Server::new_async().await;
3314        let server_resp = GET_CONTRACT_STATE_RESP;
3315
3316        // Compress the response using zstd
3317        let compressed_body =
3318            zstd::encode_all(server_resp.as_bytes(), 0).expect("compression failed");
3319
3320        let mocked_server = server
3321            .mock("POST", "/v1/contract_state")
3322            .expect(1)
3323            .with_header("Content-Encoding", "zstd")
3324            .with_body(compressed_body)
3325            .create_async()
3326            .await;
3327
3328        // Create client with compression enabled
3329        let client = HttpRPCClient::new(
3330            server.url().as_str(),
3331            HttpRPCClientOptions::new().with_compression(true),
3332        )
3333        .expect("create client");
3334
3335        let response = client
3336            .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
3337            .await
3338            .expect("get state");
3339        let accounts = response;
3340
3341        mocked_server.assert();
3342        assert_eq!(accounts.data().len(), 1);
3343        assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
3344    }
3345
3346    #[tokio::test]
3347    async fn test_compression_disabled() {
3348        let mut server = Server::new_async().await;
3349        let server_resp = GET_CONTRACT_STATE_RESP;
3350
3351        // Verify client does NOT send Accept-Encoding: zstd when compression is disabled
3352        // Instead, server should receive request without compression headers
3353        let mocked_server = server
3354            .mock("POST", "/v1/contract_state")
3355            .expect(1)
3356            .match_header("Accept-Encoding", mockito::Matcher::Missing)
3357            .with_status(200)
3358            .with_body(server_resp)
3359            .create_async()
3360            .await;
3361
3362        // Create client with compression disabled
3363        let client = HttpRPCClient::new(
3364            server.url().as_str(),
3365            HttpRPCClientOptions::new().with_compression(false),
3366        )
3367        .expect("create client");
3368
3369        let response = client
3370            .get_contract_state(ContractStateParams::new(Chain::Ethereum, ""))
3371            .await
3372            .expect("get state");
3373        let accounts = response;
3374
3375        // Verify the mock was called (client sent request without Accept-Encoding header)
3376        mocked_server.assert();
3377        assert_eq!(accounts.data().len(), 1);
3378        assert_eq!(accounts.data()[0].native_balance, Bytes::from(500u16.to_be_bytes()));
3379    }
3380
3381    #[rstest]
3382    #[case::single_page(2, 1000)]
3383    #[case::multiple_pages_within_concurrency(10, 2)]
3384    #[case::exceeds_concurrency_limit(60, 2)]
3385    #[tokio::test]
3386    async fn test_get_all_tokens_pagination_and_concurrency(
3387        #[case] total_tokens: usize,
3388        #[case] page_size: usize,
3389    ) {
3390        use std::sync::atomic::{AtomicUsize, Ordering};
3391
3392        let allowed_concurrency = 10;
3393
3394        let concurrent_requests = Arc::new(AtomicUsize::new(0));
3395        let max_concurrent = Arc::new(AtomicUsize::new(0));
3396
3397        let mut server = Server::new_async().await;
3398
3399        let total_pages = (total_tokens as f64 / page_size as f64).ceil() as i64;
3400
3401        // Mock all required pages
3402        for page in 0..total_pages {
3403            let concurrent = concurrent_requests.clone();
3404            let max_conc = max_concurrent.clone();
3405
3406            let tokens_in_page = {
3407                let start_idx = (page as usize) * page_size;
3408                let end_idx = ((page as usize + 1) * page_size).min(total_tokens);
3409                (start_idx..end_idx)
3410                    .map(|i| {
3411                        format!(
3412                            r#"{{
3413                            "chain": "ethereum",
3414                            "address": "0x{i:040x}",
3415                            "symbol": "TOKEN_{i}",
3416                            "decimals": 18,
3417                            "tax": 0,
3418                            "gas": [30000],
3419                            "quality": 100
3420                        }}"#
3421                        )
3422                    })
3423                    .collect::<Vec<_>>()
3424            };
3425
3426            let tokens_json = tokens_in_page.join(",");
3427            let response = format!(
3428                r#"{{
3429                    "tokens": [{tokens_json}],
3430                    "pagination": {{
3431                        "page": {page},
3432                        "page_size": {page_size},
3433                        "total": {total_tokens}
3434                    }}
3435                }}"#,
3436            );
3437
3438            server
3439                .mock("POST", "/v1/tokens")
3440                .expect(1)
3441                .with_chunked_body(move |w| {
3442                    // Track concurrent requests
3443                    let current = concurrent.fetch_add(1, Ordering::SeqCst);
3444                    max_conc.fetch_max(current + 1, Ordering::SeqCst);
3445
3446                    // Simulate some work to increase likelihood of concurrent requests
3447                    std::thread::sleep(Duration::from_millis(10));
3448
3449                    concurrent.fetch_sub(1, Ordering::SeqCst);
3450
3451                    w.write_all(response.as_bytes())
3452                })
3453                .create_async()
3454                .await;
3455        }
3456
3457        let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
3458            .expect("create client");
3459
3460        let tokens = client
3461            .get_all_tokens(
3462                AllTokensParams::new(Chain::Ethereum, allowed_concurrency)
3463                    .with_chunk_size(page_size),
3464            )
3465            .await
3466            .expect("get all tokens");
3467
3468        // Verify concurrency was respected
3469        let max = max_concurrent.load(Ordering::SeqCst);
3470        let expected_max_concurrency = (total_pages as usize)
3471            .saturating_sub(1)
3472            .min(allowed_concurrency);
3473        assert!(
3474            max <= allowed_concurrency,
3475            "Expected max concurrent requests <= {allowed_concurrency}, got {max}"
3476        );
3477
3478        // For cases with multiple pages, verify we actually used concurrency
3479        if total_pages > 1 && expected_max_concurrency > 1 {
3480            assert!(
3481                max > 0,
3482                "Expected some concurrent requests for multi-page response, got {max}"
3483            );
3484        }
3485
3486        // Verify we got all expected tokens
3487        assert_eq!(
3488            tokens.len(),
3489            total_tokens,
3490            "Expected {total_tokens} tokens, got {}",
3491            tokens.len()
3492        );
3493
3494        // Verify tokens are in the expected order
3495        for (i, token) in tokens.iter().enumerate() {
3496            assert_eq!(token.symbol, format!("TOKEN_{i}"), "Token at index {i} has wrong symbol");
3497        }
3498    }
3499}