Skip to main content

tycho_client/
stream.rs

1use std::{
2    cmp::max,
3    collections::{HashMap, HashSet},
4    env,
5    time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::dto::{
12    Chain, ExtractorIdentity, PaginationLimits, PaginationParams, ProtocolSystemsRequestBody,
13};
14
15use crate::{
16    deltas::DeltasClient,
17    feed::{
18        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
19        BlockSynchronizer, BlockSynchronizerError, FeedMessage,
20    },
21    rpc::{HttpRPCClientOptions, RPCClient},
22    HttpRPCClient, WsDeltasClient,
23};
24
25#[derive(Error, Debug)]
26pub enum StreamError {
27    #[error("Error during stream set up: {0}")]
28    SetUpError(String),
29
30    #[error("WebSocket client connection error: {0}")]
31    WebSocketConnectionError(String),
32
33    #[error("BlockSynchronizer error: {0}")]
34    BlockSynchronizerError(String),
35}
36
37#[non_exhaustive]
38#[derive(Clone, Debug)]
39pub enum RetryConfiguration {
40    Constant(ConstantRetryConfiguration),
41}
42
43impl RetryConfiguration {
44    pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
45        RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
46    }
47}
48
49#[derive(Clone, Debug)]
50pub struct ConstantRetryConfiguration {
51    max_attempts: u64,
52    cooldown: Duration,
53}
54
55pub struct TychoStreamBuilder {
56    tycho_url: String,
57    chain: Chain,
58    exchanges: HashMap<String, ComponentFilter>,
59    block_time: u64,
60    timeout: u64,
61    startup_timeout: Duration,
62    max_missed_blocks: u64,
63    state_sync_retry_config: RetryConfiguration,
64    websockets_retry_config: RetryConfiguration,
65    no_state: bool,
66    auth_key: Option<String>,
67    no_tls: bool,
68    include_tvl: bool,
69    compression: bool,
70    partial_blocks: bool,
71}
72
73impl TychoStreamBuilder {
74    /// Creates a new `TychoStreamBuilder` with the given Tycho URL and blockchain network.
75    /// Initializes the builder with default values for block time and timeout based on the chain.
76    pub fn new(tycho_url: &str, chain: Chain) -> Self {
77        let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
78        Self {
79            tycho_url: tycho_url.to_string(),
80            chain,
81            exchanges: HashMap::new(),
82            block_time,
83            timeout,
84            startup_timeout: Duration::from_secs(block_time * max_missed_blocks),
85            max_missed_blocks,
86            state_sync_retry_config: RetryConfiguration::constant(
87                32,
88                Duration::from_secs(max(block_time / 4, 2)),
89            ),
90            websockets_retry_config: RetryConfiguration::constant(
91                128,
92                Duration::from_secs(max(block_time / 6, 1)),
93            ),
94            no_state: false,
95            auth_key: None,
96            no_tls: true,
97            include_tvl: false,
98            compression: true,
99            partial_blocks: false,
100        }
101    }
102
103    /// Returns the default block_time, timeout and max_missed_blocks values for the given
104    /// blockchain network.
105    fn default_timing(chain: &Chain) -> (u64, u64, u64) {
106        match chain {
107            Chain::Ethereum => (12, 36, 50),
108            Chain::Starknet => (2, 8, 50),
109            Chain::ZkSync => (3, 12, 50),
110            Chain::Arbitrum => (1, 2, 100), // Typically closer to 0.25s
111            Chain::Base => (2, 12, 50),
112            Chain::Bsc => (1, 12, 50),
113            Chain::Unichain => (1, 10, 100),
114        }
115    }
116
117    /// Adds an exchange and its corresponding filter to the Tycho client.
118    pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
119        self.exchanges
120            .insert(name.to_string(), filter);
121        self
122    }
123
124    /// Sets the block time for the Tycho client.
125    pub fn block_time(mut self, block_time: u64) -> Self {
126        self.block_time = block_time;
127        self
128    }
129
130    /// Sets the timeout duration for network operations.
131    pub fn timeout(mut self, timeout: u64) -> Self {
132        self.timeout = timeout;
133        self
134    }
135
136    pub fn startup_timeout(mut self, timeout: Duration) -> Self {
137        self.startup_timeout = timeout;
138        self
139    }
140
141    pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
142        self.max_missed_blocks = max_missed_blocks;
143        self
144    }
145
146    pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
147        self.websockets_retry_config = retry_config.clone();
148        self.warn_on_potential_timing_issues();
149        self
150    }
151
152    pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
153        self.state_sync_retry_config = retry_config.clone();
154        self.warn_on_potential_timing_issues();
155        self
156    }
157
158    fn warn_on_potential_timing_issues(&self) {
159        let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
160            (&self.state_sync_retry_config, &self.websockets_retry_config);
161
162        if ws_config.cooldown >= state_config.cooldown {
163            warn!(
164                "Websocket cooldown should be < than state syncronizer cooldown \
165                to avoid spending retries due to disconnected websocket."
166            )
167        }
168    }
169
170    /// Configures the client to exclude state updates from the stream.
171    pub fn no_state(mut self, no_state: bool) -> Self {
172        self.no_state = no_state;
173        self
174    }
175
176    /// Sets the API key for authenticating with the Tycho server.
177    ///
178    /// Optionally you can set the TYCHO_AUTH_TOKEN env var instead. Make sure to set no_tsl
179    /// to false if you do this.
180    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
181        self.auth_key = auth_key;
182        self.no_tls = false;
183        self
184    }
185
186    /// Disables TLS/SSL for the connection, using `http` and `ws` protocols.
187    pub fn no_tls(mut self, no_tls: bool) -> Self {
188        self.no_tls = no_tls;
189        self
190    }
191
192    /// Configures the client to include TVL in the stream.
193    ///
194    /// If set to true, this will increase start-up time due to additional requests.
195    pub fn include_tvl(mut self, include_tvl: bool) -> Self {
196        self.include_tvl = include_tvl;
197        self
198    }
199
200    /// Disables compression for RPC and WebSocket communication.
201    /// By default, messages are compressed using zstd.
202    pub fn disable_compression(mut self) -> Self {
203        self.compression = false;
204        self
205    }
206
207    /// Enables the client to receive partial block updates (flashblocks).
208    pub fn enable_partial_blocks(mut self) -> Self {
209        self.partial_blocks = true;
210        self
211    }
212
213    /// Builds and starts the Tycho client, connecting to the Tycho server and
214    /// setting up the synchronization of exchange components.
215    pub async fn build(
216        self,
217    ) -> Result<
218        (JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
219        StreamError,
220    > {
221        if self.exchanges.is_empty() {
222            return Err(StreamError::SetUpError(
223                "At least one exchange must be registered.".to_string(),
224            ));
225        }
226
227        // Attempt to read the authentication key from the environment variable if not provided
228        let auth_key = self
229            .auth_key
230            .clone()
231            .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
232
233        info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
234
235        // Determine the URLs based on the TLS setting
236        let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
237            info!("Using non-secure connection: ws:// and http://");
238            let tycho_ws_url = format!("ws://{}", self.tycho_url);
239            let tycho_rpc_url = format!("http://{}", self.tycho_url);
240            (tycho_ws_url, tycho_rpc_url)
241        } else {
242            info!("Using secure connection: wss:// and https://");
243            let tycho_ws_url = format!("wss://{}", self.tycho_url);
244            let tycho_rpc_url = format!("https://{}", self.tycho_url);
245            (tycho_ws_url, tycho_rpc_url)
246        };
247
248        // Initialize the WebSocket client
249        let ws_client = match &self.websockets_retry_config {
250            RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
251                &tycho_ws_url,
252                auth_key.as_deref(),
253                config.max_attempts,
254                config.cooldown,
255            ),
256        }
257        .map_err(|e| StreamError::SetUpError(e.to_string()))?;
258        let rpc_client = HttpRPCClient::new(
259            &tycho_rpc_url,
260            HttpRPCClientOptions::new()
261                .with_auth_key(auth_key)
262                .with_compression(self.compression),
263        )
264        .map_err(|e| StreamError::SetUpError(e.to_string()))?;
265        let ws_jh = ws_client
266            .connect()
267            .await
268            .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
269
270        // Create and configure the BlockSynchronizer
271        let mut block_sync = BlockSynchronizer::new(
272            Duration::from_secs(self.block_time),
273            Duration::from_secs(self.timeout),
274            self.max_missed_blocks,
275        );
276
277        let requested: HashSet<_> = self.exchanges.keys().cloned().collect();
278        let info = ProtocolSystemsInfo::fetch(&rpc_client, self.chain, &requested).await;
279        info.log_other_available();
280        let dci_protocols = info.dci_protocols;
281
282        // Register each exchange with the BlockSynchronizer
283        for (name, filter) in self.exchanges {
284            info!("Registering exchange: {}", name);
285            let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
286            let uses_dci = dci_protocols.contains(&name);
287            let sync = match &self.state_sync_retry_config {
288                RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
289                    id.clone(),
290                    true,
291                    filter,
292                    retry_config.max_attempts,
293                    retry_config.cooldown,
294                    !self.no_state,
295                    self.include_tvl,
296                    self.compression,
297                    rpc_client.clone(),
298                    ws_client.clone(),
299                    self.block_time + self.timeout,
300                )
301                .with_dci(uses_dci)
302                .with_partial_blocks(self.partial_blocks),
303            };
304            block_sync = block_sync.register_synchronizer(id, sync);
305        }
306
307        // Start the BlockSynchronizer and monitor for disconnections
308        let (sync_jh, rx) = block_sync
309            .run()
310            .await
311            .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
312
313        // Monitor WebSocket and BlockSynchronizer futures
314        let handle = tokio::spawn(async move {
315            tokio::select! {
316                res = ws_jh => {
317                    let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
318                }
319                res = sync_jh => {
320                    res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
321                }
322            }
323            if let Err(e) = ws_client.close().await {
324                warn!(?e, "Failed to close WebSocket client");
325            }
326        });
327
328        Ok((handle, rx))
329    }
330}
331
332/// Result of fetching protocol systems: which protocols use DCI, and which
333/// available protocols on the server were not requested by the client.
334pub struct ProtocolSystemsInfo {
335    pub dci_protocols: HashSet<String>,
336    pub other_available: HashSet<String>,
337}
338
339impl ProtocolSystemsInfo {
340    /// Fetches protocol systems from the server and classifies them: which use DCI,
341    /// and which are available but not in `requested_exchanges`.
342    pub async fn fetch(
343        rpc_client: &HttpRPCClient,
344        chain: Chain,
345        requested_exchanges: &HashSet<String>,
346    ) -> Self {
347        let page_size =
348            ProtocolSystemsRequestBody::effective_max_page_size(rpc_client.compression());
349        let response = rpc_client
350            .get_protocol_systems(&ProtocolSystemsRequestBody {
351                chain,
352                pagination: PaginationParams { page: 0, page_size },
353            })
354            .await
355            .map_err(|e| {
356                warn!(
357                    "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
358                );
359                e
360            })
361            .ok();
362
363        let Some(response) = response else {
364            return Self { dci_protocols: HashSet::new(), other_available: HashSet::new() };
365        };
366
367        if response.pagination.total > page_size {
368            warn!(
369                "Server has {} protocol systems but only {} were fetched (page_size={page_size}). \
370                 Availability info may be incomplete.",
371                response.pagination.total,
372                response.protocol_systems.len(),
373            );
374        }
375
376        let available: HashSet<_> = response
377            .protocol_systems
378            .into_iter()
379            .collect();
380        let other_available = available
381            .difference(requested_exchanges)
382            .cloned()
383            .collect();
384        let mut dci_protocols: HashSet<String> = response
385            .dci_protocols
386            .into_iter()
387            .collect();
388
389        // TODO(ENG-5302): Remove this fallback once all environments serve
390        // the `dci_protocols` field. Old servers omit the field, which
391        // deserialises as empty — causing clients to skip entrypoint
392        // fetches for DCI protocols.
393        if dci_protocols.is_empty() {
394            const LEGACY_DCI: &[&str] = &[
395                "uniswap_v4_hooks",
396                "vm:curve",
397                "vm:balancer_v2",
398                "vm:balancer_v3",
399                "fluid_v1",
400                "erc4626",
401            ];
402            for name in requested_exchanges {
403                if LEGACY_DCI.contains(&name.as_str()) {
404                    dci_protocols.insert(name.clone());
405                }
406            }
407        }
408
409        Self { dci_protocols, other_available }
410    }
411
412    /// Logs the protocols available on the server that the client didn't subscribe to.
413    pub fn log_other_available(&self) {
414        if !self.other_available.is_empty() {
415            let names: Vec<_> = self
416                .other_available
417                .iter()
418                .cloned()
419                .collect();
420            info!("Other available protocols: {}", names.join(", "));
421        }
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428
429    #[test]
430    fn test_retry_configuration_constant() {
431        let config = RetryConfiguration::constant(5, Duration::from_secs(10));
432        match config {
433            RetryConfiguration::Constant(c) => {
434                assert_eq!(c.max_attempts, 5);
435                assert_eq!(c.cooldown, Duration::from_secs(10));
436            }
437        }
438    }
439
440    #[test]
441    fn test_stream_builder_retry_configs() {
442        let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
443        let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
444        let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
445
446        builder = builder
447            .websockets_retry_config(&ws_config)
448            .state_synchronizer_retry_config(&state_config);
449
450        // Verify configs are stored correctly by checking they match expected values
451        match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
452            (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
453                assert_eq!(ws.max_attempts, 10);
454                assert_eq!(ws.cooldown, Duration::from_secs(2));
455                assert_eq!(state.max_attempts, 20);
456                assert_eq!(state.cooldown, Duration::from_secs(5));
457            }
458        }
459    }
460
461    #[test]
462    fn test_default_stream_builder() {
463        let builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
464        assert!(builder.compression, "Compression should be enabled by default.");
465        assert!(!builder.partial_blocks, "partial_blocks should be disabled by default.");
466    }
467
468    #[tokio::test]
469    async fn test_no_exchanges() {
470        let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
471            .auth_key(Some("my_api_key".into()))
472            .build()
473            .await;
474        assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
475    }
476
477    #[ignore = "require tycho gateway"]
478    #[tokio::test]
479    async fn test_simple_build() {
480        let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
481        let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
482            .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
483            .auth_key(Some(token))
484            .build()
485            .await;
486
487        dbg!(&receiver);
488
489        assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
490    }
491}