tycho_client/
stream.rs

1use std::{
2    cmp::max,
3    collections::{HashMap, HashSet},
4    env,
5    time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
12
13use crate::{
14    deltas::DeltasClient,
15    feed::{
16        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
17        BlockSynchronizer, FeedMessage,
18    },
19    rpc::RPCClient,
20    HttpRPCClient, WsDeltasClient,
21};
22
23#[derive(Error, Debug)]
24pub enum StreamError {
25    #[error("Error during stream set up: {0}")]
26    SetUpError(String),
27
28    #[error("WebSocket client connection error: {0}")]
29    WebSocketConnectionError(String),
30
31    #[error("BlockSynchronizer error: {0}")]
32    BlockSynchronizerError(String),
33}
34
35#[non_exhaustive]
36#[derive(Clone, Debug)]
37pub enum RetryConfiguration {
38    Constant(ConstantRetryConfiguration),
39}
40
41impl RetryConfiguration {
42    pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
43        RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
44    }
45}
46
47#[derive(Clone, Debug)]
48pub struct ConstantRetryConfiguration {
49    max_attempts: u64,
50    cooldown: Duration,
51}
52
53pub struct TychoStreamBuilder {
54    tycho_url: String,
55    chain: Chain,
56    exchanges: HashMap<String, ComponentFilter>,
57    block_time: u64,
58    timeout: u64,
59    max_missed_blocks: u64,
60    state_sync_retry_config: RetryConfiguration,
61    websockets_retry_config: RetryConfiguration,
62    no_state: bool,
63    auth_key: Option<String>,
64    no_tls: bool,
65    include_tvl: bool,
66}
67
68impl TychoStreamBuilder {
69    /// Creates a new `TychoStreamBuilder` with the given Tycho URL and blockchain network.
70    /// Initializes the builder with default values for block time and timeout based on the chain.
71    pub fn new(tycho_url: &str, chain: Chain) -> Self {
72        let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
73        Self {
74            tycho_url: tycho_url.to_string(),
75            chain,
76            exchanges: HashMap::new(),
77            block_time,
78            timeout,
79            max_missed_blocks,
80            state_sync_retry_config: RetryConfiguration::constant(
81                32,
82                Duration::from_secs(max(block_time / 2, 2)),
83            ),
84            websockets_retry_config: RetryConfiguration::constant(
85                128,
86                Duration::from_secs(max(block_time / 4, 1)),
87            ),
88            no_state: false,
89            auth_key: None,
90            no_tls: true,
91            include_tvl: false,
92        }
93    }
94
95    /// Returns the default block_time, timeout and max_missed_blocks values for the given
96    /// blockchain network.
97    fn default_timing(chain: &Chain) -> (u64, u64, u64) {
98        match chain {
99            Chain::Ethereum => (12, 36, 10),
100            Chain::Starknet => (2, 8, 50),
101            Chain::ZkSync => (3, 12, 50),
102            Chain::Arbitrum => (1, 2, 100), // Typically closer to 0.25s
103            Chain::Base => (2, 12, 50),
104            Chain::Unichain => (1, 10, 100),
105        }
106    }
107
108    /// Adds an exchange and its corresponding filter to the Tycho client.
109    pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
110        self.exchanges
111            .insert(name.to_string(), filter);
112        self
113    }
114
115    /// Sets the block time for the Tycho client.
116    pub fn block_time(mut self, block_time: u64) -> Self {
117        self.block_time = block_time;
118        self
119    }
120
121    /// Sets the timeout duration for network operations.
122    pub fn timeout(mut self, timeout: u64) -> Self {
123        self.timeout = timeout;
124        self
125    }
126
127    pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
128        self.max_missed_blocks = max_missed_blocks;
129        self
130    }
131
132    pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
133        self.websockets_retry_config = retry_config.clone();
134        self.warn_on_potential_timing_issues();
135        self
136    }
137
138    pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
139        self.state_sync_retry_config = retry_config.clone();
140        self.warn_on_potential_timing_issues();
141        self
142    }
143
144    fn warn_on_potential_timing_issues(&self) {
145        let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
146            (&self.state_sync_retry_config, &self.websockets_retry_config);
147
148        if ws_config.cooldown >= state_config.cooldown {
149            warn!(
150                "Websocket cooldown should be < than state syncronizer cooldown \
151                to avoid spending retries due to disconnected websocket."
152            )
153        }
154    }
155
156    /// Configures the client to exclude state updates from the stream.
157    pub fn no_state(mut self, no_state: bool) -> Self {
158        self.no_state = no_state;
159        self
160    }
161
162    /// Sets the API key for authenticating with the Tycho server.
163    ///
164    /// Optionally you can set the TYCHO_AUTH_TOKEN env var instead. Make sure to set no_tsl
165    /// to false if you do this.
166    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
167        self.auth_key = auth_key;
168        self.no_tls = false;
169        self
170    }
171
172    /// Disables TLS/SSL for the connection, using `http` and `ws` protocols.
173    pub fn no_tls(mut self, no_tls: bool) -> Self {
174        self.no_tls = no_tls;
175        self
176    }
177
178    /// Configures the client to include TVL in the stream.
179    ///
180    /// If set to true, this will increase start-up time due to additional requests.
181    pub fn include_tvl(mut self, include_tvl: bool) -> Self {
182        self.include_tvl = include_tvl;
183        self
184    }
185
186    /// Builds and starts the Tycho client, connecting to the Tycho server and
187    /// setting up the synchronization of exchange components.
188    pub async fn build(
189        self,
190    ) -> Result<(JoinHandle<()>, Receiver<FeedMessage<BlockHeader>>), StreamError> {
191        if self.exchanges.is_empty() {
192            return Err(StreamError::SetUpError(
193                "At least one exchange must be registered.".to_string(),
194            ));
195        }
196
197        // Attempt to read the authentication key from the environment variable if not provided
198        let auth_key = self
199            .auth_key
200            .clone()
201            .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
202
203        // Determine the URLs based on the TLS setting
204        let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
205            info!("Using non-secure connection: ws:// and http://");
206            let tycho_ws_url = format!("ws://{}", self.tycho_url);
207            let tycho_rpc_url = format!("http://{}", self.tycho_url);
208            (tycho_ws_url, tycho_rpc_url)
209        } else {
210            info!("Using secure connection: wss:// and https://");
211            let tycho_ws_url = format!("wss://{}", self.tycho_url);
212            let tycho_rpc_url = format!("https://{}", self.tycho_url);
213            (tycho_ws_url, tycho_rpc_url)
214        };
215
216        // Initialize the WebSocket client
217        #[allow(unreachable_patterns)]
218        let ws_client = match &self.websockets_retry_config {
219            RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
220                &tycho_ws_url,
221                auth_key.as_deref(),
222                config.max_attempts,
223                config.cooldown,
224            ),
225            _ => {
226                return Err(StreamError::SetUpError(
227                    "Unknown websocket configuration variant!".to_string(),
228                ));
229            }
230        }
231        .map_err(|e| StreamError::SetUpError(e.to_string()))?;
232        let rpc_client = HttpRPCClient::new(&tycho_rpc_url, auth_key.as_deref())
233            .map_err(|e| StreamError::SetUpError(e.to_string()))?;
234        let ws_jh = ws_client
235            .connect()
236            .await
237            .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
238
239        // Create and configure the BlockSynchronizer
240        let mut block_sync = BlockSynchronizer::new(
241            Duration::from_secs(self.block_time),
242            Duration::from_secs(self.timeout),
243            self.max_missed_blocks,
244        );
245
246        self.display_available_protocols(&rpc_client)
247            .await;
248
249        // Register each exchange with the BlockSynchronizer
250        for (name, filter) in self.exchanges {
251            info!("Registering exchange: {}", name);
252            let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
253            #[allow(unreachable_patterns)]
254            let sync = match &self.state_sync_retry_config {
255                RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
256                    id.clone(),
257                    true,
258                    filter,
259                    retry_config.max_attempts,
260                    retry_config.cooldown,
261                    !self.no_state,
262                    self.include_tvl,
263                    rpc_client.clone(),
264                    ws_client.clone(),
265                    self.block_time + self.timeout,
266                ),
267                _ => {
268                    return Err(StreamError::SetUpError(
269                        "Unknown state synchronizer configuration variant!".to_string(),
270                    ));
271                }
272            };
273            block_sync = block_sync.register_synchronizer(id, sync);
274        }
275
276        // Start the BlockSynchronizer and monitor for disconnections
277        let (sync_jh, rx) = block_sync
278            .run()
279            .await
280            .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
281
282        // Monitor WebSocket and BlockSynchronizer futures
283        let handle = tokio::spawn(async move {
284            tokio::select! {
285                res = ws_jh => {
286                    let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
287                }
288                res = sync_jh => {
289                    res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
290                }
291            }
292            if let Err(e) = ws_client.close().await {
293                warn!(?e, "Failed to close WebSocket client");
294            }
295        });
296
297        Ok((handle, rx))
298    }
299
300    /// Displays the other available protocols not registered to within this stream builder, for the
301    /// given chain.
302    async fn display_available_protocols(&self, rpc_client: &HttpRPCClient) {
303        let available_protocols_set = rpc_client
304            .get_protocol_systems(&ProtocolSystemsRequestBody {
305                chain: self.chain,
306                pagination: PaginationParams { page: 0, page_size: 100 },
307            })
308            .await
309            .map(|resp| {
310                resp.protocol_systems
311                    .into_iter()
312                    .collect::<HashSet<_>>()
313            })
314            .map_err(|e| {
315                warn!(
316                    "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
317                );
318                e
319            })
320            .ok();
321
322        if let Some(not_requested_protocols) = available_protocols_set
323            .map(|available_protocols_set| {
324                let requested_protocol_set = self
325                    .exchanges
326                    .keys()
327                    .cloned()
328                    .collect::<HashSet<_>>();
329
330                available_protocols_set
331                    .difference(&requested_protocol_set)
332                    .cloned()
333                    .collect::<Vec<_>>()
334            })
335            .filter(|not_requested_protocols| !not_requested_protocols.is_empty())
336        {
337            info!("Other available protocols: {}", not_requested_protocols.join(", "))
338        }
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345
346    #[test]
347    fn test_retry_configuration_constant() {
348        let config = RetryConfiguration::constant(5, Duration::from_secs(10));
349        match config {
350            RetryConfiguration::Constant(c) => {
351                assert_eq!(c.max_attempts, 5);
352                assert_eq!(c.cooldown, Duration::from_secs(10));
353            }
354        }
355    }
356
357    #[test]
358    fn test_stream_builder_retry_configs() {
359        let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
360        let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
361        let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
362
363        builder = builder
364            .websockets_retry_config(&ws_config)
365            .state_synchronizer_retry_config(&state_config);
366
367        // Verify configs are stored correctly by checking they match expected values
368        match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
369            (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
370                assert_eq!(ws.max_attempts, 10);
371                assert_eq!(ws.cooldown, Duration::from_secs(2));
372                assert_eq!(state.max_attempts, 20);
373                assert_eq!(state.cooldown, Duration::from_secs(5));
374            }
375        }
376    }
377
378    #[tokio::test]
379    async fn test_no_exchanges() {
380        let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
381            .auth_key(Some("my_api_key".into()))
382            .build()
383            .await;
384        assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
385    }
386
387    #[ignore = "require tycho gateway"]
388    #[tokio::test]
389    async fn teat_simple_build() {
390        let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
391        let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
392            .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
393            .auth_key(Some(token))
394            .build()
395            .await;
396
397        dbg!(&receiver);
398
399        assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
400    }
401}