Skip to main content

tycho_client/
stream.rs

1use std::{
2    cmp::max,
3    collections::{HashMap, HashSet},
4    env,
5    time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::dto::{
12    Chain, ExtractorIdentity, PaginationLimits, PaginationParams, ProtocolSystemsRequestBody,
13};
14
15use crate::{
16    deltas::DeltasClient,
17    feed::{
18        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
19        BlockSynchronizer, BlockSynchronizerError, FeedMessage,
20    },
21    rpc::{HttpRPCClientOptions, RPCClient},
22    HttpRPCClient, WsDeltasClient,
23};
24
25#[derive(Error, Debug)]
26pub enum StreamError {
27    #[error("Error during stream set up: {0}")]
28    SetUpError(String),
29
30    #[error("WebSocket client connection error: {0}")]
31    WebSocketConnectionError(String),
32
33    #[error("BlockSynchronizer error: {0}")]
34    BlockSynchronizerError(String),
35}
36
37#[non_exhaustive]
38#[derive(Clone, Debug)]
39pub enum RetryConfiguration {
40    Constant(ConstantRetryConfiguration),
41}
42
43impl RetryConfiguration {
44    pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
45        RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
46    }
47}
48
49#[derive(Clone, Debug)]
50pub struct ConstantRetryConfiguration {
51    max_attempts: u64,
52    cooldown: Duration,
53}
54
55pub struct TychoStreamBuilder {
56    tycho_url: String,
57    chain: Chain,
58    exchanges: HashMap<String, ComponentFilter>,
59    blocklisted_ids: HashSet<String>,
60    block_time: u64,
61    timeout: u64,
62    startup_timeout: Duration,
63    max_missed_blocks: u64,
64    state_sync_retry_config: RetryConfiguration,
65    websockets_retry_config: RetryConfiguration,
66    no_state: bool,
67    auth_key: Option<String>,
68    no_tls: bool,
69    include_tvl: bool,
70    compression: bool,
71    partial_blocks: bool,
72}
73
74impl TychoStreamBuilder {
75    /// Creates a new `TychoStreamBuilder` with the given Tycho URL and blockchain network.
76    /// Initializes the builder with default values for block time and timeout based on the chain.
77    pub fn new(tycho_url: &str, chain: Chain) -> Self {
78        let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
79        Self {
80            tycho_url: tycho_url.to_string(),
81            chain,
82            exchanges: HashMap::new(),
83            blocklisted_ids: HashSet::new(),
84            block_time,
85            timeout,
86            startup_timeout: Duration::from_secs(block_time * max_missed_blocks),
87            max_missed_blocks,
88            state_sync_retry_config: RetryConfiguration::constant(
89                32,
90                Duration::from_secs(max(block_time / 4, 2)),
91            ),
92            websockets_retry_config: RetryConfiguration::constant(
93                128,
94                Duration::from_secs(max(block_time / 6, 1)),
95            ),
96            no_state: false,
97            auth_key: None,
98            no_tls: true,
99            include_tvl: false,
100            compression: true,
101            partial_blocks: false,
102        }
103    }
104
105    /// Returns the default block_time, timeout and max_missed_blocks values for the given
106    /// blockchain network.
107    fn default_timing(chain: &Chain) -> (u64, u64, u64) {
108        match chain {
109            Chain::Ethereum => (12, 36, 50),
110            Chain::Starknet => (2, 8, 50),
111            Chain::ZkSync => (3, 12, 50),
112            Chain::Arbitrum => (1, 2, 100), // Typically closer to 0.25s
113            Chain::Base => (2, 12, 50),
114            Chain::Bsc => (1, 12, 50),
115            Chain::Unichain => (1, 10, 100),
116            Chain::Polygon => (2, 12, 50), // ~2s block time
117        }
118    }
119
120    /// Adds an exchange and its corresponding filter to the Tycho client.
121    pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
122        self.exchanges
123            .insert(name.to_string(), filter);
124        self
125    }
126
127    /// Sets the block time for the Tycho client.
128    pub fn block_time(mut self, block_time: u64) -> Self {
129        self.block_time = block_time;
130        self
131    }
132
133    /// Sets the timeout duration for network operations.
134    pub fn timeout(mut self, timeout: u64) -> Self {
135        self.timeout = timeout;
136        self
137    }
138
139    pub fn startup_timeout(mut self, timeout: Duration) -> Self {
140        self.startup_timeout = timeout;
141        self
142    }
143
144    pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
145        self.max_missed_blocks = max_missed_blocks;
146        self
147    }
148
149    pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
150        self.websockets_retry_config = retry_config.clone();
151        self.warn_on_potential_timing_issues();
152        self
153    }
154
155    pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
156        self.state_sync_retry_config = retry_config.clone();
157        self.warn_on_potential_timing_issues();
158        self
159    }
160
161    fn warn_on_potential_timing_issues(&self) {
162        let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
163            (&self.state_sync_retry_config, &self.websockets_retry_config);
164
165        if ws_config.cooldown >= state_config.cooldown {
166            warn!(
167                "Websocket cooldown should be < than state syncronizer cooldown \
168                to avoid spending retries due to disconnected websocket."
169            )
170        }
171    }
172
173    /// Configures the client to exclude state updates from the stream.
174    pub fn no_state(mut self, no_state: bool) -> Self {
175        self.no_state = no_state;
176        self
177    }
178
179    /// Sets the API key for authenticating with the Tycho server.
180    ///
181    /// Optionally you can set the TYCHO_AUTH_TOKEN env var instead. Make sure to set no_tsl
182    /// to false if you do this.
183    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
184        self.auth_key = auth_key;
185        self.no_tls = false;
186        self
187    }
188
189    /// Disables TLS/SSL for the connection, using `http` and `ws` protocols.
190    pub fn no_tls(mut self, no_tls: bool) -> Self {
191        self.no_tls = no_tls;
192        self
193    }
194
195    /// Configures the client to include TVL in the stream.
196    ///
197    /// If set to true, this will increase start-up time due to additional requests.
198    pub fn include_tvl(mut self, include_tvl: bool) -> Self {
199        self.include_tvl = include_tvl;
200        self
201    }
202
203    /// Disables compression for RPC and WebSocket communication.
204    /// By default, messages are compressed using zstd.
205    pub fn disable_compression(mut self) -> Self {
206        self.compression = false;
207        self
208    }
209
210    /// Enables the client to receive partial block updates (flashblocks).
211    pub fn enable_partial_blocks(mut self) -> Self {
212        self.partial_blocks = true;
213        self
214    }
215
216    /// Blocklist specific component IDs across all registered exchanges.
217    ///
218    /// Blocklisted components are never tracked, regardless of TVL or other
219    /// filter criteria.
220    pub fn blocklisted_ids(mut self, ids: impl IntoIterator<Item = String>) -> Self {
221        self.blocklisted_ids.extend(ids);
222        self
223    }
224
225    /// Builds and starts the Tycho client, connecting to the Tycho server and
226    /// setting up the synchronization of exchange components.
227    pub async fn build(
228        self,
229    ) -> Result<
230        (JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
231        StreamError,
232    > {
233        if self.exchanges.is_empty() {
234            return Err(StreamError::SetUpError(
235                "At least one exchange must be registered.".to_string(),
236            ));
237        }
238
239        // Attempt to read the authentication key from the environment variable if not provided
240        let auth_key = self
241            .auth_key
242            .clone()
243            .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
244
245        info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
246
247        // Determine the URLs based on the TLS setting
248        let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
249            info!("Using non-secure connection: ws:// and http://");
250            let tycho_ws_url = format!("ws://{}", self.tycho_url);
251            let tycho_rpc_url = format!("http://{}", self.tycho_url);
252            (tycho_ws_url, tycho_rpc_url)
253        } else {
254            info!("Using secure connection: wss:// and https://");
255            let tycho_ws_url = format!("wss://{}", self.tycho_url);
256            let tycho_rpc_url = format!("https://{}", self.tycho_url);
257            (tycho_ws_url, tycho_rpc_url)
258        };
259
260        // Initialize the WebSocket client
261        let ws_client = match &self.websockets_retry_config {
262            RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
263                &tycho_ws_url,
264                auth_key.as_deref(),
265                config.max_attempts,
266                config.cooldown,
267            ),
268        }
269        .map_err(|e| StreamError::SetUpError(e.to_string()))?;
270        let rpc_client = HttpRPCClient::new(
271            &tycho_rpc_url,
272            HttpRPCClientOptions::new()
273                .with_auth_key(auth_key)
274                .with_compression(self.compression),
275        )
276        .map_err(|e| StreamError::SetUpError(e.to_string()))?;
277        let ws_jh = ws_client
278            .connect()
279            .await
280            .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
281
282        // Create and configure the BlockSynchronizer
283        let mut block_sync = BlockSynchronizer::new(
284            Duration::from_secs(self.block_time),
285            Duration::from_secs(self.timeout),
286            self.max_missed_blocks,
287        );
288
289        let requested: HashSet<_> = self.exchanges.keys().cloned().collect();
290        let info = ProtocolSystemsInfo::fetch(&rpc_client, self.chain, &requested).await;
291        info.log_other_available();
292        let dci_protocols = info.dci_protocols;
293
294        // Register each exchange with the BlockSynchronizer
295        for (name, filter) in self
296            .exchanges
297            .into_iter()
298            .map(|(name, filter)| {
299                let filter = if self.blocklisted_ids.is_empty() {
300                    filter
301                } else {
302                    filter.blocklist(self.blocklisted_ids.iter().cloned())
303                };
304                (name, filter)
305            })
306        {
307            info!("Registering exchange: {}", name);
308            let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
309            let uses_dci = dci_protocols.contains(&name);
310            let sync = match &self.state_sync_retry_config {
311                RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
312                    id.clone(),
313                    true,
314                    filter,
315                    retry_config.max_attempts,
316                    retry_config.cooldown,
317                    !self.no_state,
318                    self.include_tvl,
319                    self.compression,
320                    rpc_client.clone(),
321                    ws_client.clone(),
322                    self.block_time + self.timeout,
323                )
324                .with_dci(uses_dci)
325                .with_partial_blocks(self.partial_blocks),
326            };
327            block_sync = block_sync.register_synchronizer(id, sync);
328        }
329
330        // Start the BlockSynchronizer and monitor for disconnections
331        let (sync_jh, rx) = block_sync
332            .run()
333            .await
334            .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
335
336        // Monitor WebSocket and BlockSynchronizer futures
337        let handle = tokio::spawn(async move {
338            tokio::select! {
339                res = ws_jh => {
340                    let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
341                }
342                res = sync_jh => {
343                    res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
344                }
345            }
346            if let Err(e) = ws_client.close().await {
347                warn!(?e, "Failed to close WebSocket client");
348            }
349        });
350
351        Ok((handle, rx))
352    }
353}
354
355/// Result of fetching protocol systems: which protocols use DCI, and which
356/// available protocols on the server were not requested by the client.
357pub struct ProtocolSystemsInfo {
358    pub dci_protocols: HashSet<String>,
359    pub other_available: HashSet<String>,
360}
361
362impl ProtocolSystemsInfo {
363    /// Fetches protocol systems from the server and classifies them: which use DCI,
364    /// and which are available but not in `requested_exchanges`.
365    pub async fn fetch(
366        rpc_client: &HttpRPCClient,
367        chain: Chain,
368        requested_exchanges: &HashSet<String>,
369    ) -> Self {
370        let page_size =
371            ProtocolSystemsRequestBody::effective_max_page_size(rpc_client.compression());
372        let response = rpc_client
373            .get_protocol_systems(&ProtocolSystemsRequestBody {
374                chain,
375                pagination: PaginationParams { page: 0, page_size },
376            })
377            .await
378            .map_err(|e| {
379                warn!(
380                    "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
381                );
382                e
383            })
384            .ok();
385
386        let Some(response) = response else {
387            return Self { dci_protocols: HashSet::new(), other_available: HashSet::new() };
388        };
389
390        if response.pagination.total > page_size {
391            warn!(
392                "Server has {} protocol systems but only {} were fetched (page_size={page_size}). \
393                 Availability info may be incomplete.",
394                response.pagination.total,
395                response.protocol_systems.len(),
396            );
397        }
398
399        let available: HashSet<_> = response
400            .protocol_systems
401            .into_iter()
402            .collect();
403        let other_available = available
404            .difference(requested_exchanges)
405            .cloned()
406            .collect();
407        let mut dci_protocols: HashSet<String> = response
408            .dci_protocols
409            .into_iter()
410            .collect();
411
412        // TODO(ENG-5302): Remove this fallback once all environments serve
413        // the `dci_protocols` field. Old servers omit the field, which
414        // deserialises as empty — causing clients to skip entrypoint
415        // fetches for DCI protocols.
416        if dci_protocols.is_empty() {
417            const LEGACY_DCI: &[&str] = &[
418                "uniswap_v4_hooks",
419                "vm:curve",
420                "vm:balancer_v2",
421                "vm:balancer_v3",
422                "fluid_v1",
423                "erc4626",
424            ];
425            for name in requested_exchanges {
426                if LEGACY_DCI.contains(&name.as_str()) {
427                    dci_protocols.insert(name.clone());
428                }
429            }
430        }
431
432        Self { dci_protocols, other_available }
433    }
434
435    /// Logs the protocols available on the server that the client didn't subscribe to.
436    pub fn log_other_available(&self) {
437        if !self.other_available.is_empty() {
438            let names: Vec<_> = self
439                .other_available
440                .iter()
441                .cloned()
442                .collect();
443            info!("Other available protocols: {}", names.join(", "));
444        }
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451
452    #[test]
453    fn test_retry_configuration_constant() {
454        let config = RetryConfiguration::constant(5, Duration::from_secs(10));
455        match config {
456            RetryConfiguration::Constant(c) => {
457                assert_eq!(c.max_attempts, 5);
458                assert_eq!(c.cooldown, Duration::from_secs(10));
459            }
460        }
461    }
462
463    #[test]
464    fn test_stream_builder_retry_configs() {
465        let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
466        let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
467        let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
468
469        builder = builder
470            .websockets_retry_config(&ws_config)
471            .state_synchronizer_retry_config(&state_config);
472
473        // Verify configs are stored correctly by checking they match expected values
474        match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
475            (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
476                assert_eq!(ws.max_attempts, 10);
477                assert_eq!(ws.cooldown, Duration::from_secs(2));
478                assert_eq!(state.max_attempts, 20);
479                assert_eq!(state.cooldown, Duration::from_secs(5));
480            }
481        }
482    }
483
484    #[test]
485    fn test_default_stream_builder() {
486        let builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
487        assert!(builder.compression, "Compression should be enabled by default.");
488        assert!(!builder.partial_blocks, "partial_blocks should be disabled by default.");
489    }
490
491    #[tokio::test]
492    async fn test_no_exchanges() {
493        let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
494            .auth_key(Some("my_api_key".into()))
495            .build()
496            .await;
497        assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
498    }
499
500    #[ignore = "require tycho gateway"]
501    #[tokio::test]
502    async fn test_simple_build() {
503        let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
504        let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
505            .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
506            .auth_key(Some(token))
507            .build()
508            .await;
509
510        dbg!(&receiver);
511
512        assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
513    }
514}