tycho_client/
stream.rs

1use std::{
2    cmp::max,
3    collections::{HashMap, HashSet},
4    env,
5    time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
12
13use crate::{
14    deltas::DeltasClient,
15    feed::{
16        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
17        BlockSynchronizer, BlockSynchronizerError, FeedMessage,
18    },
19    rpc::RPCClient,
20    HttpRPCClient, WsDeltasClient,
21};
22
23#[derive(Error, Debug)]
24pub enum StreamError {
25    #[error("Error during stream set up: {0}")]
26    SetUpError(String),
27
28    #[error("WebSocket client connection error: {0}")]
29    WebSocketConnectionError(String),
30
31    #[error("BlockSynchronizer error: {0}")]
32    BlockSynchronizerError(String),
33}
34
35#[non_exhaustive]
36#[derive(Clone, Debug)]
37pub enum RetryConfiguration {
38    Constant(ConstantRetryConfiguration),
39}
40
41impl RetryConfiguration {
42    pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
43        RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
44    }
45}
46
47#[derive(Clone, Debug)]
48pub struct ConstantRetryConfiguration {
49    max_attempts: u64,
50    cooldown: Duration,
51}
52
53pub struct TychoStreamBuilder {
54    tycho_url: String,
55    chain: Chain,
56    exchanges: HashMap<String, ComponentFilter>,
57    block_time: u64,
58    timeout: u64,
59    startup_timeout: Duration,
60    max_missed_blocks: u64,
61    state_sync_retry_config: RetryConfiguration,
62    websockets_retry_config: RetryConfiguration,
63    no_state: bool,
64    auth_key: Option<String>,
65    no_tls: bool,
66    include_tvl: bool,
67}
68
69impl TychoStreamBuilder {
70    /// Creates a new `TychoStreamBuilder` with the given Tycho URL and blockchain network.
71    /// Initializes the builder with default values for block time and timeout based on the chain.
72    pub fn new(tycho_url: &str, chain: Chain) -> Self {
73        let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
74        Self {
75            tycho_url: tycho_url.to_string(),
76            chain,
77            exchanges: HashMap::new(),
78            block_time,
79            timeout,
80            startup_timeout: Duration::from_secs(block_time * max_missed_blocks),
81            max_missed_blocks,
82            state_sync_retry_config: RetryConfiguration::constant(
83                32,
84                Duration::from_secs(max(block_time / 4, 2)),
85            ),
86            websockets_retry_config: RetryConfiguration::constant(
87                128,
88                Duration::from_secs(max(block_time / 6, 1)),
89            ),
90            no_state: false,
91            auth_key: None,
92            no_tls: true,
93            include_tvl: false,
94        }
95    }
96
97    /// Returns the default block_time, timeout and max_missed_blocks values for the given
98    /// blockchain network.
99    fn default_timing(chain: &Chain) -> (u64, u64, u64) {
100        match chain {
101            Chain::Ethereum => (12, 36, 50),
102            Chain::Starknet => (2, 8, 50),
103            Chain::ZkSync => (3, 12, 50),
104            Chain::Arbitrum => (1, 2, 100), // Typically closer to 0.25s
105            Chain::Base => (2, 12, 50),
106            Chain::Bsc => (1, 12, 50),
107            Chain::Unichain => (1, 10, 100),
108        }
109    }
110
111    /// Adds an exchange and its corresponding filter to the Tycho client.
112    pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
113        self.exchanges
114            .insert(name.to_string(), filter);
115        self
116    }
117
118    /// Sets the block time for the Tycho client.
119    pub fn block_time(mut self, block_time: u64) -> Self {
120        self.block_time = block_time;
121        self
122    }
123
124    /// Sets the timeout duration for network operations.
125    pub fn timeout(mut self, timeout: u64) -> Self {
126        self.timeout = timeout;
127        self
128    }
129
130    pub fn startup_timeout(mut self, timeout: Duration) -> Self {
131        self.startup_timeout = timeout;
132        self
133    }
134
135    pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
136        self.max_missed_blocks = max_missed_blocks;
137        self
138    }
139
140    pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
141        self.websockets_retry_config = retry_config.clone();
142        self.warn_on_potential_timing_issues();
143        self
144    }
145
146    pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
147        self.state_sync_retry_config = retry_config.clone();
148        self.warn_on_potential_timing_issues();
149        self
150    }
151
152    fn warn_on_potential_timing_issues(&self) {
153        let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
154            (&self.state_sync_retry_config, &self.websockets_retry_config);
155
156        if ws_config.cooldown >= state_config.cooldown {
157            warn!(
158                "Websocket cooldown should be < than state syncronizer cooldown \
159                to avoid spending retries due to disconnected websocket."
160            )
161        }
162    }
163
164    /// Configures the client to exclude state updates from the stream.
165    pub fn no_state(mut self, no_state: bool) -> Self {
166        self.no_state = no_state;
167        self
168    }
169
170    /// Sets the API key for authenticating with the Tycho server.
171    ///
172    /// Optionally you can set the TYCHO_AUTH_TOKEN env var instead. Make sure to set no_tsl
173    /// to false if you do this.
174    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
175        self.auth_key = auth_key;
176        self.no_tls = false;
177        self
178    }
179
180    /// Disables TLS/SSL for the connection, using `http` and `ws` protocols.
181    pub fn no_tls(mut self, no_tls: bool) -> Self {
182        self.no_tls = no_tls;
183        self
184    }
185
186    /// Configures the client to include TVL in the stream.
187    ///
188    /// If set to true, this will increase start-up time due to additional requests.
189    pub fn include_tvl(mut self, include_tvl: bool) -> Self {
190        self.include_tvl = include_tvl;
191        self
192    }
193
194    /// Builds and starts the Tycho client, connecting to the Tycho server and
195    /// setting up the synchronization of exchange components.
196    pub async fn build(
197        self,
198    ) -> Result<
199        (JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
200        StreamError,
201    > {
202        if self.exchanges.is_empty() {
203            return Err(StreamError::SetUpError(
204                "At least one exchange must be registered.".to_string(),
205            ));
206        }
207
208        // Attempt to read the authentication key from the environment variable if not provided
209        let auth_key = self
210            .auth_key
211            .clone()
212            .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
213
214        info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
215
216        // Determine the URLs based on the TLS setting
217        let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
218            info!("Using non-secure connection: ws:// and http://");
219            let tycho_ws_url = format!("ws://{}", self.tycho_url);
220            let tycho_rpc_url = format!("http://{}", self.tycho_url);
221            (tycho_ws_url, tycho_rpc_url)
222        } else {
223            info!("Using secure connection: wss:// and https://");
224            let tycho_ws_url = format!("wss://{}", self.tycho_url);
225            let tycho_rpc_url = format!("https://{}", self.tycho_url);
226            (tycho_ws_url, tycho_rpc_url)
227        };
228
229        // Initialize the WebSocket client
230        #[allow(unreachable_patterns)]
231        let ws_client = match &self.websockets_retry_config {
232            RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
233                &tycho_ws_url,
234                auth_key.as_deref(),
235                config.max_attempts,
236                config.cooldown,
237            ),
238            _ => {
239                return Err(StreamError::SetUpError(
240                    "Unknown websocket configuration variant!".to_string(),
241                ));
242            }
243        }
244        .map_err(|e| StreamError::SetUpError(e.to_string()))?;
245        let rpc_client = HttpRPCClient::new(&tycho_rpc_url, auth_key.as_deref())
246            .map_err(|e| StreamError::SetUpError(e.to_string()))?;
247        let ws_jh = ws_client
248            .connect()
249            .await
250            .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
251
252        // Create and configure the BlockSynchronizer
253        let mut block_sync = BlockSynchronizer::new(
254            Duration::from_secs(self.block_time),
255            Duration::from_secs(self.timeout),
256            self.max_missed_blocks,
257        );
258
259        self.display_available_protocols(&rpc_client)
260            .await;
261
262        // Register each exchange with the BlockSynchronizer
263        for (name, filter) in self.exchanges {
264            info!("Registering exchange: {}", name);
265            let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
266            #[allow(unreachable_patterns)]
267            let sync = match &self.state_sync_retry_config {
268                RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
269                    id.clone(),
270                    true,
271                    filter,
272                    retry_config.max_attempts,
273                    retry_config.cooldown,
274                    !self.no_state,
275                    self.include_tvl,
276                    rpc_client.clone(),
277                    ws_client.clone(),
278                    self.block_time + self.timeout,
279                ),
280                _ => {
281                    return Err(StreamError::SetUpError(
282                        "Unknown state synchronizer configuration variant!".to_string(),
283                    ));
284                }
285            };
286            block_sync = block_sync.register_synchronizer(id, sync);
287        }
288
289        // Start the BlockSynchronizer and monitor for disconnections
290        let (sync_jh, rx) = block_sync
291            .run()
292            .await
293            .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
294
295        // Monitor WebSocket and BlockSynchronizer futures
296        let handle = tokio::spawn(async move {
297            tokio::select! {
298                res = ws_jh => {
299                    let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
300                }
301                res = sync_jh => {
302                    res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
303                }
304            }
305            if let Err(e) = ws_client.close().await {
306                warn!(?e, "Failed to close WebSocket client");
307            }
308        });
309
310        Ok((handle, rx))
311    }
312
313    /// Displays the other available protocols not registered to within this stream builder, for the
314    /// given chain.
315    async fn display_available_protocols(&self, rpc_client: &HttpRPCClient) {
316        let available_protocols_set = rpc_client
317            .get_protocol_systems(&ProtocolSystemsRequestBody {
318                chain: self.chain,
319                pagination: PaginationParams { page: 0, page_size: 100 },
320            })
321            .await
322            .map(|resp| {
323                resp.protocol_systems
324                    .into_iter()
325                    .collect::<HashSet<_>>()
326            })
327            .map_err(|e| {
328                warn!(
329                    "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
330                );
331                e
332            })
333            .ok();
334
335        if let Some(not_requested_protocols) = available_protocols_set
336            .map(|available_protocols_set| {
337                let requested_protocol_set = self
338                    .exchanges
339                    .keys()
340                    .cloned()
341                    .collect::<HashSet<_>>();
342
343                available_protocols_set
344                    .difference(&requested_protocol_set)
345                    .cloned()
346                    .collect::<Vec<_>>()
347            })
348            .filter(|not_requested_protocols| !not_requested_protocols.is_empty())
349        {
350            info!("Other available protocols: {}", not_requested_protocols.join(", "))
351        }
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358
359    #[test]
360    fn test_retry_configuration_constant() {
361        let config = RetryConfiguration::constant(5, Duration::from_secs(10));
362        match config {
363            RetryConfiguration::Constant(c) => {
364                assert_eq!(c.max_attempts, 5);
365                assert_eq!(c.cooldown, Duration::from_secs(10));
366            }
367        }
368    }
369
370    #[test]
371    fn test_stream_builder_retry_configs() {
372        let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
373        let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
374        let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
375
376        builder = builder
377            .websockets_retry_config(&ws_config)
378            .state_synchronizer_retry_config(&state_config);
379
380        // Verify configs are stored correctly by checking they match expected values
381        match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
382            (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
383                assert_eq!(ws.max_attempts, 10);
384                assert_eq!(ws.cooldown, Duration::from_secs(2));
385                assert_eq!(state.max_attempts, 20);
386                assert_eq!(state.cooldown, Duration::from_secs(5));
387            }
388        }
389    }
390
391    #[tokio::test]
392    async fn test_no_exchanges() {
393        let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
394            .auth_key(Some("my_api_key".into()))
395            .build()
396            .await;
397        assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
398    }
399
400    #[ignore = "require tycho gateway"]
401    #[tokio::test]
402    async fn teat_simple_build() {
403        let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
404        let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
405            .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
406            .auth_key(Some(token))
407            .build()
408            .await;
409
410        dbg!(&receiver);
411
412        assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
413    }
414}