Skip to main content

tycho_client/
stream.rs

1use std::{
2    cmp::max,
3    collections::{HashMap, HashSet},
4    env,
5    time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
12
13use crate::{
14    deltas::DeltasClient,
15    feed::{
16        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
17        BlockSynchronizer, BlockSynchronizerError, FeedMessage,
18    },
19    rpc::{HttpRPCClientOptions, RPCClient},
20    HttpRPCClient, WsDeltasClient,
21};
22
23#[derive(Error, Debug)]
24pub enum StreamError {
25    #[error("Error during stream set up: {0}")]
26    SetUpError(String),
27
28    #[error("WebSocket client connection error: {0}")]
29    WebSocketConnectionError(String),
30
31    #[error("BlockSynchronizer error: {0}")]
32    BlockSynchronizerError(String),
33}
34
35#[non_exhaustive]
36#[derive(Clone, Debug)]
37pub enum RetryConfiguration {
38    Constant(ConstantRetryConfiguration),
39}
40
41impl RetryConfiguration {
42    pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
43        RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
44    }
45}
46
47#[derive(Clone, Debug)]
48pub struct ConstantRetryConfiguration {
49    max_attempts: u64,
50    cooldown: Duration,
51}
52
53pub struct TychoStreamBuilder {
54    tycho_url: String,
55    chain: Chain,
56    exchanges: HashMap<String, ComponentFilter>,
57    block_time: u64,
58    timeout: u64,
59    startup_timeout: Duration,
60    max_missed_blocks: u64,
61    state_sync_retry_config: RetryConfiguration,
62    websockets_retry_config: RetryConfiguration,
63    no_state: bool,
64    auth_key: Option<String>,
65    no_tls: bool,
66    include_tvl: bool,
67    compression: bool,
68    partial_blocks: bool,
69}
70
71impl TychoStreamBuilder {
72    /// Creates a new `TychoStreamBuilder` with the given Tycho URL and blockchain network.
73    /// Initializes the builder with default values for block time and timeout based on the chain.
74    pub fn new(tycho_url: &str, chain: Chain) -> Self {
75        let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
76        Self {
77            tycho_url: tycho_url.to_string(),
78            chain,
79            exchanges: HashMap::new(),
80            block_time,
81            timeout,
82            startup_timeout: Duration::from_secs(block_time * max_missed_blocks),
83            max_missed_blocks,
84            state_sync_retry_config: RetryConfiguration::constant(
85                32,
86                Duration::from_secs(max(block_time / 4, 2)),
87            ),
88            websockets_retry_config: RetryConfiguration::constant(
89                128,
90                Duration::from_secs(max(block_time / 6, 1)),
91            ),
92            no_state: false,
93            auth_key: None,
94            no_tls: true,
95            include_tvl: false,
96            compression: true,
97            partial_blocks: false,
98        }
99    }
100
101    /// Returns the default block_time, timeout and max_missed_blocks values for the given
102    /// blockchain network.
103    fn default_timing(chain: &Chain) -> (u64, u64, u64) {
104        match chain {
105            Chain::Ethereum => (12, 36, 50),
106            Chain::Starknet => (2, 8, 50),
107            Chain::ZkSync => (3, 12, 50),
108            Chain::Arbitrum => (1, 2, 100), // Typically closer to 0.25s
109            Chain::Base => (2, 12, 50),
110            Chain::Bsc => (1, 12, 50),
111            Chain::Unichain => (1, 10, 100),
112        }
113    }
114
115    /// Adds an exchange and its corresponding filter to the Tycho client.
116    pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
117        self.exchanges
118            .insert(name.to_string(), filter);
119        self
120    }
121
122    /// Sets the block time for the Tycho client.
123    pub fn block_time(mut self, block_time: u64) -> Self {
124        self.block_time = block_time;
125        self
126    }
127
128    /// Sets the timeout duration for network operations.
129    pub fn timeout(mut self, timeout: u64) -> Self {
130        self.timeout = timeout;
131        self
132    }
133
134    pub fn startup_timeout(mut self, timeout: Duration) -> Self {
135        self.startup_timeout = timeout;
136        self
137    }
138
139    pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
140        self.max_missed_blocks = max_missed_blocks;
141        self
142    }
143
144    pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
145        self.websockets_retry_config = retry_config.clone();
146        self.warn_on_potential_timing_issues();
147        self
148    }
149
150    pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
151        self.state_sync_retry_config = retry_config.clone();
152        self.warn_on_potential_timing_issues();
153        self
154    }
155
156    fn warn_on_potential_timing_issues(&self) {
157        let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
158            (&self.state_sync_retry_config, &self.websockets_retry_config);
159
160        if ws_config.cooldown >= state_config.cooldown {
161            warn!(
162                "Websocket cooldown should be < than state syncronizer cooldown \
163                to avoid spending retries due to disconnected websocket."
164            )
165        }
166    }
167
168    /// Configures the client to exclude state updates from the stream.
169    pub fn no_state(mut self, no_state: bool) -> Self {
170        self.no_state = no_state;
171        self
172    }
173
174    /// Sets the API key for authenticating with the Tycho server.
175    ///
176    /// Optionally you can set the TYCHO_AUTH_TOKEN env var instead. Make sure to set no_tsl
177    /// to false if you do this.
178    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
179        self.auth_key = auth_key;
180        self.no_tls = false;
181        self
182    }
183
184    /// Disables TLS/SSL for the connection, using `http` and `ws` protocols.
185    pub fn no_tls(mut self, no_tls: bool) -> Self {
186        self.no_tls = no_tls;
187        self
188    }
189
190    /// Configures the client to include TVL in the stream.
191    ///
192    /// If set to true, this will increase start-up time due to additional requests.
193    pub fn include_tvl(mut self, include_tvl: bool) -> Self {
194        self.include_tvl = include_tvl;
195        self
196    }
197
198    /// Disables compression for RPC and WebSocket communication.
199    /// By default, messages are compressed using zstd.
200    pub fn disable_compression(mut self) -> Self {
201        self.compression = false;
202        self
203    }
204
205    /// Enables the client to receive partial block updates (flashblocks).
206    pub fn enable_partial_blocks(mut self) -> Self {
207        self.partial_blocks = true;
208        self
209    }
210
211    /// Builds and starts the Tycho client, connecting to the Tycho server and
212    /// setting up the synchronization of exchange components.
213    pub async fn build(
214        self,
215    ) -> Result<
216        (JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
217        StreamError,
218    > {
219        if self.exchanges.is_empty() {
220            return Err(StreamError::SetUpError(
221                "At least one exchange must be registered.".to_string(),
222            ));
223        }
224
225        // Attempt to read the authentication key from the environment variable if not provided
226        let auth_key = self
227            .auth_key
228            .clone()
229            .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
230
231        info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
232
233        // Determine the URLs based on the TLS setting
234        let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
235            info!("Using non-secure connection: ws:// and http://");
236            let tycho_ws_url = format!("ws://{}", self.tycho_url);
237            let tycho_rpc_url = format!("http://{}", self.tycho_url);
238            (tycho_ws_url, tycho_rpc_url)
239        } else {
240            info!("Using secure connection: wss:// and https://");
241            let tycho_ws_url = format!("wss://{}", self.tycho_url);
242            let tycho_rpc_url = format!("https://{}", self.tycho_url);
243            (tycho_ws_url, tycho_rpc_url)
244        };
245
246        // Initialize the WebSocket client
247        let ws_client = match &self.websockets_retry_config {
248            RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
249                &tycho_ws_url,
250                auth_key.as_deref(),
251                config.max_attempts,
252                config.cooldown,
253            ),
254        }
255        .map_err(|e| StreamError::SetUpError(e.to_string()))?;
256        let rpc_client = HttpRPCClient::new(
257            &tycho_rpc_url,
258            HttpRPCClientOptions::new()
259                .with_auth_key(auth_key)
260                .with_compression(self.compression),
261        )
262        .map_err(|e| StreamError::SetUpError(e.to_string()))?;
263        let ws_jh = ws_client
264            .connect()
265            .await
266            .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
267
268        // Create and configure the BlockSynchronizer
269        let mut block_sync = BlockSynchronizer::new(
270            Duration::from_secs(self.block_time),
271            Duration::from_secs(self.timeout),
272            self.max_missed_blocks,
273        );
274
275        self.display_available_protocols(&rpc_client)
276            .await;
277
278        // Register each exchange with the BlockSynchronizer
279        for (name, filter) in self.exchanges {
280            info!("Registering exchange: {}", name);
281            let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
282            let sync = match &self.state_sync_retry_config {
283                RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
284                    id.clone(),
285                    true,
286                    filter,
287                    retry_config.max_attempts,
288                    retry_config.cooldown,
289                    !self.no_state,
290                    self.include_tvl,
291                    self.compression,
292                    rpc_client.clone(),
293                    ws_client.clone(),
294                    self.block_time + self.timeout,
295                )
296                .with_partial_blocks(self.partial_blocks),
297            };
298            block_sync = block_sync.register_synchronizer(id, sync);
299        }
300
301        // Start the BlockSynchronizer and monitor for disconnections
302        let (sync_jh, rx) = block_sync
303            .run()
304            .await
305            .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
306
307        // Monitor WebSocket and BlockSynchronizer futures
308        let handle = tokio::spawn(async move {
309            tokio::select! {
310                res = ws_jh => {
311                    let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
312                }
313                res = sync_jh => {
314                    res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
315                }
316            }
317            if let Err(e) = ws_client.close().await {
318                warn!(?e, "Failed to close WebSocket client");
319            }
320        });
321
322        Ok((handle, rx))
323    }
324
325    /// Displays the other available protocols not registered to within this stream builder, for the
326    /// given chain.
327    async fn display_available_protocols(&self, rpc_client: &HttpRPCClient) {
328        let available_protocols_set = rpc_client
329            .get_protocol_systems(&ProtocolSystemsRequestBody {
330                chain: self.chain,
331                pagination: PaginationParams { page: 0, page_size: 100 },
332            })
333            .await
334            .map(|resp| {
335                resp.protocol_systems
336                    .into_iter()
337                    .collect::<HashSet<_>>()
338            })
339            .map_err(|e| {
340                warn!(
341                    "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
342                );
343                e
344            })
345            .ok();
346
347        if let Some(not_requested_protocols) = available_protocols_set
348            .map(|available_protocols_set| {
349                let requested_protocol_set = self
350                    .exchanges
351                    .keys()
352                    .cloned()
353                    .collect::<HashSet<_>>();
354
355                available_protocols_set
356                    .difference(&requested_protocol_set)
357                    .cloned()
358                    .collect::<Vec<_>>()
359            })
360            .filter(|not_requested_protocols| !not_requested_protocols.is_empty())
361        {
362            info!("Other available protocols: {}", not_requested_protocols.join(", "))
363        }
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    #[test]
372    fn test_retry_configuration_constant() {
373        let config = RetryConfiguration::constant(5, Duration::from_secs(10));
374        match config {
375            RetryConfiguration::Constant(c) => {
376                assert_eq!(c.max_attempts, 5);
377                assert_eq!(c.cooldown, Duration::from_secs(10));
378            }
379        }
380    }
381
382    #[test]
383    fn test_stream_builder_retry_configs() {
384        let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
385        let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
386        let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
387
388        builder = builder
389            .websockets_retry_config(&ws_config)
390            .state_synchronizer_retry_config(&state_config);
391
392        // Verify configs are stored correctly by checking they match expected values
393        match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
394            (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
395                assert_eq!(ws.max_attempts, 10);
396                assert_eq!(ws.cooldown, Duration::from_secs(2));
397                assert_eq!(state.max_attempts, 20);
398                assert_eq!(state.cooldown, Duration::from_secs(5));
399            }
400        }
401    }
402
403    #[test]
404    fn test_default_stream_builder() {
405        let builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
406        assert!(builder.compression, "Compression should be enabled by default.");
407        assert!(!builder.partial_blocks, "partial_blocks should be disabled by default.");
408    }
409
410    #[tokio::test]
411    async fn test_no_exchanges() {
412        let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
413            .auth_key(Some("my_api_key".into()))
414            .build()
415            .await;
416        assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
417    }
418
419    #[ignore = "require tycho gateway"]
420    #[tokio::test]
421    async fn test_simple_build() {
422        let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
423        let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
424            .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
425            .auth_key(Some(token))
426            .build()
427            .await;
428
429        dbg!(&receiver);
430
431        assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
432    }
433}