Skip to main content

tycho_client/
stream.rs

1use std::{
2    cmp::max,
3    collections::{HashMap, HashSet},
4    env,
5    time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::{
12    dto::{PaginationLimits, ProtocolSystemsRequestBody},
13    models::{Chain, ExtractorIdentity},
14};
15
16use crate::{
17    deltas::DeltasClient,
18    feed::{
19        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
20        BlockSynchronizer, BlockSynchronizerError, FeedMessage,
21    },
22    rpc::{HttpRPCClientOptions, ProtocolSystemsParams, RPCClient},
23    HttpRPCClient, WsDeltasClient,
24};
25
26#[derive(Error, Debug)]
27pub enum StreamError {
28    #[error("Error during stream set up: {0}")]
29    SetUpError(String),
30
31    #[error("WebSocket client connection error: {0}")]
32    WebSocketConnectionError(String),
33
34    #[error("BlockSynchronizer error: {0}")]
35    BlockSynchronizerError(String),
36}
37
38#[non_exhaustive]
39#[derive(Clone, Debug)]
40pub enum RetryConfiguration {
41    Constant(ConstantRetryConfiguration),
42}
43
44impl RetryConfiguration {
45    pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
46        RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
47    }
48}
49
50#[derive(Clone, Debug)]
51pub struct ConstantRetryConfiguration {
52    max_attempts: u64,
53    cooldown: Duration,
54}
55
56pub struct TychoStreamBuilder {
57    tycho_url: String,
58    chain: Chain,
59    exchanges: HashMap<String, ComponentFilter>,
60    blocklisted_ids: HashSet<String>,
61    block_time: u64,
62    timeout: u64,
63    startup_timeout: Duration,
64    max_missed_blocks: u64,
65    state_sync_retry_config: RetryConfiguration,
66    websockets_retry_config: RetryConfiguration,
67    no_state: bool,
68    auth_key: Option<String>,
69    no_tls: bool,
70    include_tvl: bool,
71    compression: bool,
72    partial_blocks: bool,
73    max_messages: Option<usize>,
74}
75
76impl TychoStreamBuilder {
77    /// Creates a new `TychoStreamBuilder` with the given Tycho URL and blockchain network.
78    /// Initializes the builder with default values for block time and timeout based on the chain.
79    pub fn new(tycho_url: &str, chain: Chain) -> Self {
80        let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
81        Self {
82            tycho_url: tycho_url.to_string(),
83            chain,
84            exchanges: HashMap::new(),
85            blocklisted_ids: HashSet::new(),
86            block_time,
87            timeout,
88            startup_timeout: Duration::from_secs(block_time * max_missed_blocks),
89            max_missed_blocks,
90            state_sync_retry_config: RetryConfiguration::constant(
91                32,
92                Duration::from_secs(max(block_time / 4, 2)),
93            ),
94            websockets_retry_config: RetryConfiguration::constant(
95                128,
96                Duration::from_secs(max(block_time / 6, 1)),
97            ),
98            no_state: false,
99            auth_key: None,
100            no_tls: true,
101            include_tvl: false,
102            compression: true,
103            partial_blocks: false,
104            max_messages: None,
105        }
106    }
107
108    /// Returns the default block_time, timeout and max_missed_blocks values for the given
109    /// blockchain network.
110    fn default_timing(chain: &Chain) -> (u64, u64, u64) {
111        match chain {
112            Chain::Ethereum => (12, 36, 50),
113            Chain::Starknet => (2, 8, 50),
114            Chain::ZkSync => (3, 12, 50),
115            Chain::Arbitrum => (1, 2, 100), // Typically closer to 0.25s
116            Chain::Base => (2, 12, 50),
117            Chain::Bsc => (1, 12, 50),
118            Chain::Unichain => (1, 10, 100),
119            Chain::Polygon => (2, 12, 50), // ~2s block time
120        }
121    }
122
123    /// Adds an exchange and its corresponding filter to the Tycho client.
124    pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
125        self.exchanges
126            .insert(name.to_string(), filter);
127        self
128    }
129
130    /// Sets the block time for the Tycho client.
131    pub fn block_time(mut self, block_time: u64) -> Self {
132        self.block_time = block_time;
133        self
134    }
135
136    /// Sets the timeout duration for network operations.
137    pub fn timeout(mut self, timeout: u64) -> Self {
138        self.timeout = timeout;
139        self
140    }
141
142    pub fn startup_timeout(mut self, timeout: Duration) -> Self {
143        self.startup_timeout = timeout;
144        self
145    }
146
147    pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
148        self.max_missed_blocks = max_missed_blocks;
149        self
150    }
151
152    pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
153        self.websockets_retry_config = retry_config.clone();
154        self.warn_on_potential_timing_issues();
155        self
156    }
157
158    pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
159        self.state_sync_retry_config = retry_config.clone();
160        self.warn_on_potential_timing_issues();
161        self
162    }
163
164    fn warn_on_potential_timing_issues(&self) {
165        let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
166            (&self.state_sync_retry_config, &self.websockets_retry_config);
167
168        if ws_config.cooldown >= state_config.cooldown {
169            warn!(
170                "Websocket cooldown should be < than state syncronizer cooldown \
171                to avoid spending retries due to disconnected websocket."
172            )
173        }
174    }
175
176    /// Configures the client to exclude state updates from the stream.
177    pub fn no_state(mut self, no_state: bool) -> Self {
178        self.no_state = no_state;
179        self
180    }
181
182    /// Sets the API key for authenticating with the Tycho server.
183    ///
184    /// Optionally you can set the TYCHO_AUTH_TOKEN env var instead. Make sure to set no_tsl
185    /// to false if you do this.
186    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
187        self.auth_key = auth_key;
188        self.no_tls = false;
189        self
190    }
191
192    /// Disables TLS/SSL for the connection, using `http` and `ws` protocols.
193    pub fn no_tls(mut self, no_tls: bool) -> Self {
194        self.no_tls = no_tls;
195        self
196    }
197
198    /// Configures the client to include TVL in the stream.
199    ///
200    /// If set to true, this will increase start-up time due to additional requests.
201    pub fn include_tvl(mut self, include_tvl: bool) -> Self {
202        self.include_tvl = include_tvl;
203        self
204    }
205
206    /// Disables compression for RPC and WebSocket communication.
207    /// By default, messages are compressed using zstd.
208    pub fn disable_compression(mut self) -> Self {
209        self.compression = false;
210        self
211    }
212
213    /// Enables the client to receive partial block updates (flashblocks).
214    pub fn enable_partial_blocks(mut self) -> Self {
215        self.partial_blocks = true;
216        self
217    }
218
219    /// Stops the stream after emitting this many messages. Useful for testing or
220    /// triggering a periodic restart after a fixed number of blocks.
221    pub fn max_messages(mut self, n: usize) -> Self {
222        self.max_messages = Some(n);
223        self
224    }
225
226    /// Overrides the maximum number of retry attempts for state synchronizer startup.
227    /// The retry cooldown is derived from the chain's block time and is not affected.
228    pub fn max_retries(mut self, max_retries: u64) -> Self {
229        let cooldown = match &self.state_sync_retry_config {
230            RetryConfiguration::Constant(c) => c.cooldown,
231        };
232        self.state_sync_retry_config = RetryConfiguration::constant(max_retries, cooldown);
233        self
234    }
235
236    /// Blocklist specific component IDs across all registered exchanges.
237    ///
238    /// Blocklisted components are never tracked, regardless of TVL or other
239    /// filter criteria.
240    pub fn blocklisted_ids(mut self, ids: impl IntoIterator<Item = String>) -> Self {
241        self.blocklisted_ids.extend(ids);
242        self
243    }
244
245    /// Builds and starts the Tycho client, connecting to the Tycho server and
246    /// setting up the synchronization of exchange components.
247    pub async fn build(
248        self,
249    ) -> Result<
250        (JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
251        StreamError,
252    > {
253        if self.exchanges.is_empty() {
254            return Err(StreamError::SetUpError(
255                "At least one exchange must be registered.".to_string(),
256            ));
257        }
258
259        // Attempt to read the authentication key from the environment variable if not provided
260        let auth_key = self
261            .auth_key
262            .clone()
263            .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
264
265        info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
266
267        // Determine the URLs based on the TLS setting
268        let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
269            info!("Using non-secure connection: ws:// and http://");
270            let tycho_ws_url = format!("ws://{}", self.tycho_url);
271            let tycho_rpc_url = format!("http://{}", self.tycho_url);
272            (tycho_ws_url, tycho_rpc_url)
273        } else {
274            info!("Using secure connection: wss:// and https://");
275            let tycho_ws_url = format!("wss://{}", self.tycho_url);
276            let tycho_rpc_url = format!("https://{}", self.tycho_url);
277            (tycho_ws_url, tycho_rpc_url)
278        };
279
280        // Initialize the WebSocket client
281        let ws_client = match &self.websockets_retry_config {
282            RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
283                &tycho_ws_url,
284                auth_key.as_deref(),
285                config.max_attempts,
286                config.cooldown,
287            ),
288        }
289        .map_err(|e| StreamError::SetUpError(e.to_string()))?;
290        let rpc_client = HttpRPCClient::new(
291            &tycho_rpc_url,
292            HttpRPCClientOptions::new()
293                .with_auth_key(auth_key)
294                .with_compression(self.compression),
295        )
296        .map_err(|e| StreamError::SetUpError(e.to_string()))?;
297        let ws_jh = ws_client
298            .connect()
299            .await
300            .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
301
302        // Create and configure the BlockSynchronizer
303        let mut block_sync = BlockSynchronizer::new(
304            Duration::from_secs(self.block_time),
305            Duration::from_secs(self.timeout),
306            self.max_missed_blocks,
307        );
308        if let Some(n) = self.max_messages {
309            block_sync.max_messages(n);
310        }
311
312        let requested: HashSet<_> = self.exchanges.keys().cloned().collect();
313        let info = ProtocolSystemsInfo::fetch(&rpc_client, self.chain, &requested).await;
314        info.log_other_available();
315        let dci_protocols = info.dci_protocols;
316
317        // Register each exchange with the BlockSynchronizer
318        for (name, filter) in self
319            .exchanges
320            .into_iter()
321            .map(|(name, filter)| {
322                let filter = if self.blocklisted_ids.is_empty() {
323                    filter
324                } else {
325                    filter.blocklist(self.blocklisted_ids.iter().cloned())
326                };
327                (name, filter)
328            })
329        {
330            info!("Registering exchange: {}", name);
331            let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
332            let uses_dci = dci_protocols.contains(&name);
333            let sync = match &self.state_sync_retry_config {
334                RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
335                    id.clone(),
336                    true,
337                    filter,
338                    retry_config.max_attempts,
339                    retry_config.cooldown,
340                    !self.no_state,
341                    self.include_tvl,
342                    self.compression,
343                    rpc_client.clone(),
344                    ws_client.clone(),
345                    self.block_time + self.timeout,
346                )
347                .with_dci(uses_dci)
348                .with_partial_blocks(self.partial_blocks),
349            };
350            block_sync = block_sync.register_synchronizer(id, sync);
351        }
352
353        // Start the BlockSynchronizer and monitor for disconnections
354        let (sync_jh, rx) = block_sync
355            .run()
356            .await
357            .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
358
359        // Monitor WebSocket and BlockSynchronizer futures
360        let handle = tokio::spawn(async move {
361            tokio::select! {
362                res = ws_jh => {
363                    let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
364                }
365                res = sync_jh => {
366                    res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
367                }
368            }
369            if let Err(e) = ws_client.close().await {
370                warn!(?e, "Failed to close WebSocket client");
371            }
372        });
373
374        Ok((handle, rx))
375    }
376}
377
378/// Result of fetching protocol systems: which protocols use DCI, and which
379/// available protocols on the server were not requested by the client.
380pub struct ProtocolSystemsInfo {
381    pub dci_protocols: HashSet<String>,
382    pub other_available: HashSet<String>,
383}
384
385impl ProtocolSystemsInfo {
386    /// Fetches protocol systems from the server and classifies them: which use DCI,
387    /// and which are available but not in `requested_exchanges`.
388    pub async fn fetch(
389        rpc_client: &HttpRPCClient,
390        chain: Chain,
391        requested_exchanges: &HashSet<String>,
392    ) -> Self {
393        let page_size =
394            ProtocolSystemsRequestBody::effective_max_page_size(rpc_client.compression());
395        let params = ProtocolSystemsParams::new(chain).with_pagination(0, page_size);
396        let response = rpc_client
397            .get_protocol_systems(params)
398            .await
399            .map_err(|e| {
400                warn!(
401                    "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
402                );
403                e
404            })
405            .ok();
406
407        let Some(response) = response else {
408            return Self { dci_protocols: HashSet::new(), other_available: HashSet::new() };
409        };
410
411        if response.total() > page_size {
412            warn!(
413                "Server has {} protocol systems but only {} were fetched (page_size={page_size}). \
414                 Availability info may be incomplete.",
415                response.total(),
416                response.data().protocol_systems().len(),
417            );
418        }
419
420        let available: HashSet<_> = response
421            .data()
422            .protocol_systems()
423            .iter()
424            .cloned()
425            .collect();
426        let other_available = available
427            .difference(requested_exchanges)
428            .cloned()
429            .collect();
430        let mut dci_protocols: HashSet<String> = response
431            .data()
432            .dci_protocols()
433            .iter()
434            .cloned()
435            .collect();
436
437        // TODO(ENG-5302): Remove this fallback once all environments serve
438        // the `dci_protocols` field. Old servers omit the field, which
439        // deserialises as empty — causing clients to skip entrypoint
440        // fetches for DCI protocols.
441        if dci_protocols.is_empty() {
442            const LEGACY_DCI: &[&str] = &[
443                "uniswap_v4_hooks",
444                "vm:curve",
445                "vm:balancer_v2",
446                "vm:balancer_v3",
447                "fluid_v1",
448                "erc4626",
449            ];
450            for name in requested_exchanges {
451                if LEGACY_DCI.contains(&name.as_str()) {
452                    dci_protocols.insert(name.clone());
453                }
454            }
455        }
456
457        Self { dci_protocols, other_available }
458    }
459
460    /// Logs the protocols available on the server that the client didn't subscribe to.
461    pub fn log_other_available(&self) {
462        if !self.other_available.is_empty() {
463            let names: Vec<_> = self
464                .other_available
465                .iter()
466                .cloned()
467                .collect();
468            info!("Other available protocols: {}", names.join(", "));
469        }
470    }
471}
472
473#[cfg(test)]
474mod tests {
475    use super::*;
476
477    #[test]
478    fn test_retry_configuration_constant() {
479        let config = RetryConfiguration::constant(5, Duration::from_secs(10));
480        match config {
481            RetryConfiguration::Constant(c) => {
482                assert_eq!(c.max_attempts, 5);
483                assert_eq!(c.cooldown, Duration::from_secs(10));
484            }
485        }
486    }
487
488    #[test]
489    fn test_stream_builder_retry_configs() {
490        let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
491        let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
492        let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
493
494        builder = builder
495            .websockets_retry_config(&ws_config)
496            .state_synchronizer_retry_config(&state_config);
497
498        // Verify configs are stored correctly by checking they match expected values
499        match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
500            (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
501                assert_eq!(ws.max_attempts, 10);
502                assert_eq!(ws.cooldown, Duration::from_secs(2));
503                assert_eq!(state.max_attempts, 20);
504                assert_eq!(state.cooldown, Duration::from_secs(5));
505            }
506        }
507    }
508
509    #[test]
510    fn test_default_stream_builder() {
511        let builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
512        assert!(builder.compression, "Compression should be enabled by default.");
513        assert!(!builder.partial_blocks, "partial_blocks should be disabled by default.");
514    }
515
516    #[tokio::test]
517    async fn test_no_exchanges() {
518        let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
519            .auth_key(Some("my_api_key".into()))
520            .build()
521            .await;
522        assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
523    }
524
525    #[ignore = "require tycho gateway"]
526    #[tokio::test]
527    async fn test_simple_build() {
528        let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
529        let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
530            .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
531            .auth_key(Some(token))
532            .build()
533            .await;
534
535        dbg!(&receiver);
536
537        assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
538    }
539}