Skip to main content

tycho_client/
stream.rs

1use std::{
2    cmp::max,
3    collections::{HashMap, HashSet},
4    env,
5    time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
12
13use crate::{
14    deltas::DeltasClient,
15    feed::{
16        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
17        BlockSynchronizer, BlockSynchronizerError, FeedMessage,
18    },
19    rpc::{HttpRPCClientOptions, RPCClient},
20    HttpRPCClient, WsDeltasClient,
21};
22
23#[derive(Error, Debug)]
24pub enum StreamError {
25    #[error("Error during stream set up: {0}")]
26    SetUpError(String),
27
28    #[error("WebSocket client connection error: {0}")]
29    WebSocketConnectionError(String),
30
31    #[error("BlockSynchronizer error: {0}")]
32    BlockSynchronizerError(String),
33}
34
35#[non_exhaustive]
36#[derive(Clone, Debug)]
37pub enum RetryConfiguration {
38    Constant(ConstantRetryConfiguration),
39}
40
41impl RetryConfiguration {
42    pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
43        RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
44    }
45}
46
47#[derive(Clone, Debug)]
48pub struct ConstantRetryConfiguration {
49    max_attempts: u64,
50    cooldown: Duration,
51}
52
53pub struct TychoStreamBuilder {
54    tycho_url: String,
55    chain: Chain,
56    exchanges: HashMap<String, ComponentFilter>,
57    block_time: u64,
58    timeout: u64,
59    startup_timeout: Duration,
60    max_missed_blocks: u64,
61    state_sync_retry_config: RetryConfiguration,
62    websockets_retry_config: RetryConfiguration,
63    no_state: bool,
64    auth_key: Option<String>,
65    no_tls: bool,
66    include_tvl: bool,
67    compression: bool,
68    partial_blocks: bool,
69}
70
71impl TychoStreamBuilder {
72    /// Creates a new `TychoStreamBuilder` with the given Tycho URL and blockchain network.
73    /// Initializes the builder with default values for block time and timeout based on the chain.
74    pub fn new(tycho_url: &str, chain: Chain) -> Self {
75        let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
76        Self {
77            tycho_url: tycho_url.to_string(),
78            chain,
79            exchanges: HashMap::new(),
80            block_time,
81            timeout,
82            startup_timeout: Duration::from_secs(block_time * max_missed_blocks),
83            max_missed_blocks,
84            state_sync_retry_config: RetryConfiguration::constant(
85                32,
86                Duration::from_secs(max(block_time / 4, 2)),
87            ),
88            websockets_retry_config: RetryConfiguration::constant(
89                128,
90                Duration::from_secs(max(block_time / 6, 1)),
91            ),
92            no_state: false,
93            auth_key: None,
94            no_tls: true,
95            include_tvl: false,
96            compression: true,
97            partial_blocks: false,
98        }
99    }
100
101    /// Returns the default block_time, timeout and max_missed_blocks values for the given
102    /// blockchain network.
103    fn default_timing(chain: &Chain) -> (u64, u64, u64) {
104        match chain {
105            Chain::Ethereum => (12, 36, 50),
106            Chain::Starknet => (2, 8, 50),
107            Chain::ZkSync => (3, 12, 50),
108            Chain::Arbitrum => (1, 2, 100), // Typically closer to 0.25s
109            Chain::Base => (2, 12, 50),
110            Chain::Bsc => (1, 12, 50),
111            Chain::Unichain => (1, 10, 100),
112        }
113    }
114
115    /// Adds an exchange and its corresponding filter to the Tycho client.
116    pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
117        self.exchanges
118            .insert(name.to_string(), filter);
119        self
120    }
121
122    /// Sets the block time for the Tycho client.
123    pub fn block_time(mut self, block_time: u64) -> Self {
124        self.block_time = block_time;
125        self
126    }
127
128    /// Sets the timeout duration for network operations.
129    pub fn timeout(mut self, timeout: u64) -> Self {
130        self.timeout = timeout;
131        self
132    }
133
134    pub fn startup_timeout(mut self, timeout: Duration) -> Self {
135        self.startup_timeout = timeout;
136        self
137    }
138
139    pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
140        self.max_missed_blocks = max_missed_blocks;
141        self
142    }
143
144    pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
145        self.websockets_retry_config = retry_config.clone();
146        self.warn_on_potential_timing_issues();
147        self
148    }
149
150    pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
151        self.state_sync_retry_config = retry_config.clone();
152        self.warn_on_potential_timing_issues();
153        self
154    }
155
156    fn warn_on_potential_timing_issues(&self) {
157        let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
158            (&self.state_sync_retry_config, &self.websockets_retry_config);
159
160        if ws_config.cooldown >= state_config.cooldown {
161            warn!(
162                "Websocket cooldown should be < than state syncronizer cooldown \
163                to avoid spending retries due to disconnected websocket."
164            )
165        }
166    }
167
168    /// Configures the client to exclude state updates from the stream.
169    pub fn no_state(mut self, no_state: bool) -> Self {
170        self.no_state = no_state;
171        self
172    }
173
174    /// Sets the API key for authenticating with the Tycho server.
175    ///
176    /// Optionally you can set the TYCHO_AUTH_TOKEN env var instead. Make sure to set no_tsl
177    /// to false if you do this.
178    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
179        self.auth_key = auth_key;
180        self.no_tls = false;
181        self
182    }
183
184    /// Disables TLS/SSL for the connection, using `http` and `ws` protocols.
185    pub fn no_tls(mut self, no_tls: bool) -> Self {
186        self.no_tls = no_tls;
187        self
188    }
189
190    /// Configures the client to include TVL in the stream.
191    ///
192    /// If set to true, this will increase start-up time due to additional requests.
193    pub fn include_tvl(mut self, include_tvl: bool) -> Self {
194        self.include_tvl = include_tvl;
195        self
196    }
197
198    /// Disables compression for RPC and WebSocket communication.
199    /// By default, messages are compressed using zstd.
200    pub fn disable_compression(mut self) -> Self {
201        self.compression = false;
202        self
203    }
204
205    /// Configures the client to receive partial block updates (flashblocks).
206    /// When enabled, the client will receive incremental updates within a block.
207    pub fn with_partial_blocks(mut self, val: bool) -> Self {
208        self.partial_blocks = val;
209        self
210    }
211
212    /// Builds and starts the Tycho client, connecting to the Tycho server and
213    /// setting up the synchronization of exchange components.
214    pub async fn build(
215        self,
216    ) -> Result<
217        (JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
218        StreamError,
219    > {
220        if self.exchanges.is_empty() {
221            return Err(StreamError::SetUpError(
222                "At least one exchange must be registered.".to_string(),
223            ));
224        }
225
226        // Attempt to read the authentication key from the environment variable if not provided
227        let auth_key = self
228            .auth_key
229            .clone()
230            .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
231
232        info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
233
234        // Determine the URLs based on the TLS setting
235        let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
236            info!("Using non-secure connection: ws:// and http://");
237            let tycho_ws_url = format!("ws://{}", self.tycho_url);
238            let tycho_rpc_url = format!("http://{}", self.tycho_url);
239            (tycho_ws_url, tycho_rpc_url)
240        } else {
241            info!("Using secure connection: wss:// and https://");
242            let tycho_ws_url = format!("wss://{}", self.tycho_url);
243            let tycho_rpc_url = format!("https://{}", self.tycho_url);
244            (tycho_ws_url, tycho_rpc_url)
245        };
246
247        // Initialize the WebSocket client
248        let ws_client = match &self.websockets_retry_config {
249            RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
250                &tycho_ws_url,
251                auth_key.as_deref(),
252                config.max_attempts,
253                config.cooldown,
254            ),
255        }
256        .map_err(|e| StreamError::SetUpError(e.to_string()))?;
257        let rpc_client = HttpRPCClient::new(
258            &tycho_rpc_url,
259            HttpRPCClientOptions::new()
260                .with_auth_key(auth_key)
261                .with_compression(self.compression),
262        )
263        .map_err(|e| StreamError::SetUpError(e.to_string()))?;
264        let ws_jh = ws_client
265            .connect()
266            .await
267            .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
268
269        // Create and configure the BlockSynchronizer
270        let mut block_sync = BlockSynchronizer::new(
271            Duration::from_secs(self.block_time),
272            Duration::from_secs(self.timeout),
273            self.max_missed_blocks,
274        );
275
276        self.display_available_protocols(&rpc_client)
277            .await;
278
279        // Register each exchange with the BlockSynchronizer
280        for (name, filter) in self.exchanges {
281            info!("Registering exchange: {}", name);
282            let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
283            let sync = match &self.state_sync_retry_config {
284                RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
285                    id.clone(),
286                    true,
287                    filter,
288                    retry_config.max_attempts,
289                    retry_config.cooldown,
290                    !self.no_state,
291                    self.include_tvl,
292                    self.compression,
293                    rpc_client.clone(),
294                    ws_client.clone(),
295                    self.block_time + self.timeout,
296                )
297                .with_partial_blocks(self.partial_blocks),
298            };
299            block_sync = block_sync.register_synchronizer(id, sync);
300        }
301
302        // Start the BlockSynchronizer and monitor for disconnections
303        let (sync_jh, rx) = block_sync
304            .run()
305            .await
306            .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
307
308        // Monitor WebSocket and BlockSynchronizer futures
309        let handle = tokio::spawn(async move {
310            tokio::select! {
311                res = ws_jh => {
312                    let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
313                }
314                res = sync_jh => {
315                    res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
316                }
317            }
318            if let Err(e) = ws_client.close().await {
319                warn!(?e, "Failed to close WebSocket client");
320            }
321        });
322
323        Ok((handle, rx))
324    }
325
326    /// Displays the other available protocols not registered to within this stream builder, for the
327    /// given chain.
328    async fn display_available_protocols(&self, rpc_client: &HttpRPCClient) {
329        let available_protocols_set = rpc_client
330            .get_protocol_systems(&ProtocolSystemsRequestBody {
331                chain: self.chain,
332                pagination: PaginationParams { page: 0, page_size: 100 },
333            })
334            .await
335            .map(|resp| {
336                resp.protocol_systems
337                    .into_iter()
338                    .collect::<HashSet<_>>()
339            })
340            .map_err(|e| {
341                warn!(
342                    "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
343                );
344                e
345            })
346            .ok();
347
348        if let Some(not_requested_protocols) = available_protocols_set
349            .map(|available_protocols_set| {
350                let requested_protocol_set = self
351                    .exchanges
352                    .keys()
353                    .cloned()
354                    .collect::<HashSet<_>>();
355
356                available_protocols_set
357                    .difference(&requested_protocol_set)
358                    .cloned()
359                    .collect::<Vec<_>>()
360            })
361            .filter(|not_requested_protocols| !not_requested_protocols.is_empty())
362        {
363            info!("Other available protocols: {}", not_requested_protocols.join(", "))
364        }
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371
372    #[test]
373    fn test_retry_configuration_constant() {
374        let config = RetryConfiguration::constant(5, Duration::from_secs(10));
375        match config {
376            RetryConfiguration::Constant(c) => {
377                assert_eq!(c.max_attempts, 5);
378                assert_eq!(c.cooldown, Duration::from_secs(10));
379            }
380        }
381    }
382
383    #[test]
384    fn test_stream_builder_retry_configs() {
385        let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
386        let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
387        let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
388
389        builder = builder
390            .websockets_retry_config(&ws_config)
391            .state_synchronizer_retry_config(&state_config);
392
393        // Verify configs are stored correctly by checking they match expected values
394        match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
395            (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
396                assert_eq!(ws.max_attempts, 10);
397                assert_eq!(ws.cooldown, Duration::from_secs(2));
398                assert_eq!(state.max_attempts, 20);
399                assert_eq!(state.cooldown, Duration::from_secs(5));
400            }
401        }
402    }
403
404    #[test]
405    fn test_default_stream_builder() {
406        let builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
407        assert!(builder.compression, "Compression should be enabled by default.");
408        assert!(!builder.partial_blocks, "partial_blocks should be disabled by default.");
409    }
410
411    #[tokio::test]
412    async fn test_no_exchanges() {
413        let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
414            .auth_key(Some("my_api_key".into()))
415            .build()
416            .await;
417        assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
418    }
419
420    #[ignore = "require tycho gateway"]
421    #[tokio::test]
422    async fn test_simple_build() {
423        let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
424        let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
425            .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
426            .auth_key(Some(token))
427            .build()
428            .await;
429
430        dbg!(&receiver);
431
432        assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
433    }
434}