tycho_client/
stream.rs

1use std::{
2    cmp::max,
3    collections::{HashMap, HashSet},
4    env,
5    time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
12
13use crate::{
14    deltas::DeltasClient,
15    feed::{
16        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
17        BlockSynchronizer, BlockSynchronizerError, FeedMessage,
18    },
19    rpc::{HttpRPCClientOptions, RPCClient},
20    HttpRPCClient, WsDeltasClient,
21};
22
23#[derive(Error, Debug)]
24pub enum StreamError {
25    #[error("Error during stream set up: {0}")]
26    SetUpError(String),
27
28    #[error("WebSocket client connection error: {0}")]
29    WebSocketConnectionError(String),
30
31    #[error("BlockSynchronizer error: {0}")]
32    BlockSynchronizerError(String),
33}
34
35#[non_exhaustive]
36#[derive(Clone, Debug)]
37pub enum RetryConfiguration {
38    Constant(ConstantRetryConfiguration),
39}
40
41impl RetryConfiguration {
42    pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
43        RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
44    }
45}
46
47#[derive(Clone, Debug)]
48pub struct ConstantRetryConfiguration {
49    max_attempts: u64,
50    cooldown: Duration,
51}
52
53pub struct TychoStreamBuilder {
54    tycho_url: String,
55    chain: Chain,
56    exchanges: HashMap<String, ComponentFilter>,
57    block_time: u64,
58    timeout: u64,
59    startup_timeout: Duration,
60    max_missed_blocks: u64,
61    state_sync_retry_config: RetryConfiguration,
62    websockets_retry_config: RetryConfiguration,
63    no_state: bool,
64    auth_key: Option<String>,
65    no_tls: bool,
66    include_tvl: bool,
67    compression: bool,
68}
69
70impl TychoStreamBuilder {
71    /// Creates a new `TychoStreamBuilder` with the given Tycho URL and blockchain network.
72    /// Initializes the builder with default values for block time and timeout based on the chain.
73    pub fn new(tycho_url: &str, chain: Chain) -> Self {
74        let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
75        Self {
76            tycho_url: tycho_url.to_string(),
77            chain,
78            exchanges: HashMap::new(),
79            block_time,
80            timeout,
81            startup_timeout: Duration::from_secs(block_time * max_missed_blocks),
82            max_missed_blocks,
83            state_sync_retry_config: RetryConfiguration::constant(
84                32,
85                Duration::from_secs(max(block_time / 4, 2)),
86            ),
87            websockets_retry_config: RetryConfiguration::constant(
88                128,
89                Duration::from_secs(max(block_time / 6, 1)),
90            ),
91            no_state: false,
92            auth_key: None,
93            no_tls: true,
94            include_tvl: false,
95            compression: true,
96        }
97    }
98
99    /// Returns the default block_time, timeout and max_missed_blocks values for the given
100    /// blockchain network.
101    fn default_timing(chain: &Chain) -> (u64, u64, u64) {
102        match chain {
103            Chain::Ethereum => (12, 36, 50),
104            Chain::Starknet => (2, 8, 50),
105            Chain::ZkSync => (3, 12, 50),
106            Chain::Arbitrum => (1, 2, 100), // Typically closer to 0.25s
107            Chain::Base => (2, 12, 50),
108            Chain::Bsc => (1, 12, 50),
109            Chain::Unichain => (1, 10, 100),
110        }
111    }
112
113    /// Adds an exchange and its corresponding filter to the Tycho client.
114    pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
115        self.exchanges
116            .insert(name.to_string(), filter);
117        self
118    }
119
120    /// Sets the block time for the Tycho client.
121    pub fn block_time(mut self, block_time: u64) -> Self {
122        self.block_time = block_time;
123        self
124    }
125
126    /// Sets the timeout duration for network operations.
127    pub fn timeout(mut self, timeout: u64) -> Self {
128        self.timeout = timeout;
129        self
130    }
131
132    pub fn startup_timeout(mut self, timeout: Duration) -> Self {
133        self.startup_timeout = timeout;
134        self
135    }
136
137    pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
138        self.max_missed_blocks = max_missed_blocks;
139        self
140    }
141
142    pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
143        self.websockets_retry_config = retry_config.clone();
144        self.warn_on_potential_timing_issues();
145        self
146    }
147
148    pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
149        self.state_sync_retry_config = retry_config.clone();
150        self.warn_on_potential_timing_issues();
151        self
152    }
153
154    fn warn_on_potential_timing_issues(&self) {
155        let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
156            (&self.state_sync_retry_config, &self.websockets_retry_config);
157
158        if ws_config.cooldown >= state_config.cooldown {
159            warn!(
160                "Websocket cooldown should be < than state syncronizer cooldown \
161                to avoid spending retries due to disconnected websocket."
162            )
163        }
164    }
165
166    /// Configures the client to exclude state updates from the stream.
167    pub fn no_state(mut self, no_state: bool) -> Self {
168        self.no_state = no_state;
169        self
170    }
171
172    /// Sets the API key for authenticating with the Tycho server.
173    ///
174    /// Optionally you can set the TYCHO_AUTH_TOKEN env var instead. Make sure to set no_tsl
175    /// to false if you do this.
176    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
177        self.auth_key = auth_key;
178        self.no_tls = false;
179        self
180    }
181
182    /// Disables TLS/SSL for the connection, using `http` and `ws` protocols.
183    pub fn no_tls(mut self, no_tls: bool) -> Self {
184        self.no_tls = no_tls;
185        self
186    }
187
188    /// Configures the client to include TVL in the stream.
189    ///
190    /// If set to true, this will increase start-up time due to additional requests.
191    pub fn include_tvl(mut self, include_tvl: bool) -> Self {
192        self.include_tvl = include_tvl;
193        self
194    }
195
196    /// Disables compression for RPC and WebSocket communication.
197    /// By default, messages are compressed using zstd.
198    pub fn disable_compression(mut self) -> Self {
199        self.compression = false;
200        self
201    }
202
203    /// Builds and starts the Tycho client, connecting to the Tycho server and
204    /// setting up the synchronization of exchange components.
205    pub async fn build(
206        self,
207    ) -> Result<
208        (JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
209        StreamError,
210    > {
211        if self.exchanges.is_empty() {
212            return Err(StreamError::SetUpError(
213                "At least one exchange must be registered.".to_string(),
214            ));
215        }
216
217        // Attempt to read the authentication key from the environment variable if not provided
218        let auth_key = self
219            .auth_key
220            .clone()
221            .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
222
223        info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
224
225        // Determine the URLs based on the TLS setting
226        let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
227            info!("Using non-secure connection: ws:// and http://");
228            let tycho_ws_url = format!("ws://{}", self.tycho_url);
229            let tycho_rpc_url = format!("http://{}", self.tycho_url);
230            (tycho_ws_url, tycho_rpc_url)
231        } else {
232            info!("Using secure connection: wss:// and https://");
233            let tycho_ws_url = format!("wss://{}", self.tycho_url);
234            let tycho_rpc_url = format!("https://{}", self.tycho_url);
235            (tycho_ws_url, tycho_rpc_url)
236        };
237
238        // Initialize the WebSocket client
239        let ws_client = match &self.websockets_retry_config {
240            RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
241                &tycho_ws_url,
242                auth_key.as_deref(),
243                config.max_attempts,
244                config.cooldown,
245            ),
246        }
247        .map_err(|e| StreamError::SetUpError(e.to_string()))?;
248        let rpc_client = HttpRPCClient::new(
249            &tycho_rpc_url,
250            HttpRPCClientOptions::new()
251                .with_auth_key(auth_key)
252                .with_compression(self.compression),
253        )
254        .map_err(|e| StreamError::SetUpError(e.to_string()))?;
255        let ws_jh = ws_client
256            .connect()
257            .await
258            .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
259
260        // Create and configure the BlockSynchronizer
261        let mut block_sync = BlockSynchronizer::new(
262            Duration::from_secs(self.block_time),
263            Duration::from_secs(self.timeout),
264            self.max_missed_blocks,
265        );
266
267        self.display_available_protocols(&rpc_client)
268            .await;
269
270        // Register each exchange with the BlockSynchronizer
271        for (name, filter) in self.exchanges {
272            info!("Registering exchange: {}", name);
273            let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
274            let sync = match &self.state_sync_retry_config {
275                RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
276                    id.clone(),
277                    true,
278                    filter,
279                    retry_config.max_attempts,
280                    retry_config.cooldown,
281                    !self.no_state,
282                    self.include_tvl,
283                    self.compression,
284                    rpc_client.clone(),
285                    ws_client.clone(),
286                    self.block_time + self.timeout,
287                ),
288            };
289            block_sync = block_sync.register_synchronizer(id, sync);
290        }
291
292        // Start the BlockSynchronizer and monitor for disconnections
293        let (sync_jh, rx) = block_sync
294            .run()
295            .await
296            .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
297
298        // Monitor WebSocket and BlockSynchronizer futures
299        let handle = tokio::spawn(async move {
300            tokio::select! {
301                res = ws_jh => {
302                    let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
303                }
304                res = sync_jh => {
305                    res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
306                }
307            }
308            if let Err(e) = ws_client.close().await {
309                warn!(?e, "Failed to close WebSocket client");
310            }
311        });
312
313        Ok((handle, rx))
314    }
315
316    /// Displays the other available protocols not registered to within this stream builder, for the
317    /// given chain.
318    async fn display_available_protocols(&self, rpc_client: &HttpRPCClient) {
319        let available_protocols_set = rpc_client
320            .get_protocol_systems(&ProtocolSystemsRequestBody {
321                chain: self.chain,
322                pagination: PaginationParams { page: 0, page_size: 100 },
323            })
324            .await
325            .map(|resp| {
326                resp.protocol_systems
327                    .into_iter()
328                    .collect::<HashSet<_>>()
329            })
330            .map_err(|e| {
331                warn!(
332                    "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
333                );
334                e
335            })
336            .ok();
337
338        if let Some(not_requested_protocols) = available_protocols_set
339            .map(|available_protocols_set| {
340                let requested_protocol_set = self
341                    .exchanges
342                    .keys()
343                    .cloned()
344                    .collect::<HashSet<_>>();
345
346                available_protocols_set
347                    .difference(&requested_protocol_set)
348                    .cloned()
349                    .collect::<Vec<_>>()
350            })
351            .filter(|not_requested_protocols| !not_requested_protocols.is_empty())
352        {
353            info!("Other available protocols: {}", not_requested_protocols.join(", "))
354        }
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361
362    #[test]
363    fn test_retry_configuration_constant() {
364        let config = RetryConfiguration::constant(5, Duration::from_secs(10));
365        match config {
366            RetryConfiguration::Constant(c) => {
367                assert_eq!(c.max_attempts, 5);
368                assert_eq!(c.cooldown, Duration::from_secs(10));
369            }
370        }
371    }
372
373    #[test]
374    fn test_stream_builder_retry_configs() {
375        let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
376        let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
377        let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
378
379        builder = builder
380            .websockets_retry_config(&ws_config)
381            .state_synchronizer_retry_config(&state_config);
382
383        // Verify configs are stored correctly by checking they match expected values
384        match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
385            (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
386                assert_eq!(ws.max_attempts, 10);
387                assert_eq!(ws.cooldown, Duration::from_secs(2));
388                assert_eq!(state.max_attempts, 20);
389                assert_eq!(state.cooldown, Duration::from_secs(5));
390            }
391        }
392    }
393
394    #[test]
395    fn test_default_compression() {
396        let builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
397        assert!(builder.compression, "Compression should be enabled by default.");
398    }
399
400    #[tokio::test]
401    async fn test_no_exchanges() {
402        let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
403            .auth_key(Some("my_api_key".into()))
404            .build()
405            .await;
406        assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
407    }
408
409    #[ignore = "require tycho gateway"]
410    #[tokio::test]
411    async fn test_simple_build() {
412        let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
413        let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
414            .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
415            .auth_key(Some(token))
416            .build()
417            .await;
418
419        dbg!(&receiver);
420
421        assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
422    }
423}