tycho_client/
stream.rs

1use std::{
2    collections::{HashMap, HashSet},
3    env,
4    time::Duration,
5};
6
7use thiserror::Error;
8use tokio::{sync::mpsc::Receiver, task::JoinHandle};
9use tracing::{info, warn};
10use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
11
12use crate::{
13    deltas::DeltasClient,
14    feed::{
15        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
16        BlockSynchronizer, FeedMessage,
17    },
18    rpc::RPCClient,
19    HttpRPCClient, WsDeltasClient,
20};
21
22#[derive(Error, Debug)]
23pub enum StreamError {
24    #[error("Error during stream set up: {0}")]
25    SetUpError(String),
26
27    #[error("WebSocket client connection error: {0}")]
28    WebSocketConnectionError(String),
29
30    #[error("BlockSynchronizer error: {0}")]
31    BlockSynchronizerError(String),
32}
33
34pub struct TychoStreamBuilder {
35    tycho_url: String,
36    chain: Chain,
37    exchanges: HashMap<String, ComponentFilter>,
38    block_time: u64,
39    timeout: u64,
40    max_missed_blocks: u64,
41    no_state: bool,
42    auth_key: Option<String>,
43    no_tls: bool,
44    include_tvl: bool,
45}
46
47impl TychoStreamBuilder {
48    /// Creates a new `TychoStreamBuilder` with the given Tycho URL and blockchain network.
49    /// Initializes the builder with default values for block time and timeout based on the chain.
50    pub fn new(tycho_url: &str, chain: Chain) -> Self {
51        let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
52        Self {
53            tycho_url: tycho_url.to_string(),
54            chain,
55            exchanges: HashMap::new(),
56            block_time,
57            timeout,
58            max_missed_blocks,
59            no_state: false,
60            auth_key: None,
61            no_tls: true,
62            include_tvl: false,
63        }
64    }
65
66    /// Returns the default block_time, timeout and max_missed_blocks values for the given
67    /// blockchain network.
68    fn default_timing(chain: &Chain) -> (u64, u64, u64) {
69        match chain {
70            Chain::Ethereum => (12, 36, 10),
71            Chain::Starknet => (2, 8, 50),
72            Chain::ZkSync => (3, 12, 50),
73            Chain::Arbitrum => (1, 2, 100), // Typically closer to 0.25s
74            Chain::Base => (2, 12, 50),
75            Chain::Unichain => (1, 10, 100),
76        }
77    }
78
79    /// Adds an exchange and its corresponding filter to the Tycho client.
80    pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
81        self.exchanges
82            .insert(name.to_string(), filter);
83        self
84    }
85
86    /// Sets the block time for the Tycho client.
87    pub fn block_time(mut self, block_time: u64) -> Self {
88        self.block_time = block_time;
89        self
90    }
91
92    /// Sets the timeout duration for network operations.
93    pub fn timeout(mut self, timeout: u64) -> Self {
94        self.timeout = timeout;
95        self
96    }
97
98    pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
99        self.max_missed_blocks = max_missed_blocks;
100        self
101    }
102
103    /// Configures the client to exclude state updates from the stream.
104    pub fn no_state(mut self, no_state: bool) -> Self {
105        self.no_state = no_state;
106        self
107    }
108
109    /// Sets the API key for authenticating with the Tycho server.
110    ///
111    /// Optionally you can set the TYCHO_AUTH_TOKEN env var instead. Make sure to set no_tsl
112    /// to false if you do this.
113    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
114        self.auth_key = auth_key;
115        self.no_tls = false;
116        self
117    }
118
119    /// Disables TLS/SSL for the connection, using `http` and `ws` protocols.
120    pub fn no_tls(mut self, no_tls: bool) -> Self {
121        self.no_tls = no_tls;
122        self
123    }
124
125    /// Configures the client to include TVL in the stream.
126    ///
127    /// If set to true, this will increase start-up time due to additional requests.
128    pub fn include_tvl(mut self, include_tvl: bool) -> Self {
129        self.include_tvl = include_tvl;
130        self
131    }
132
133    /// Builds and starts the Tycho client, connecting to the Tycho server and
134    /// setting up the synchronization of exchange components.
135    pub async fn build(
136        self,
137    ) -> Result<(JoinHandle<()>, Receiver<FeedMessage<BlockHeader>>), StreamError> {
138        if self.exchanges.is_empty() {
139            return Err(StreamError::SetUpError(
140                "At least one exchange must be registered.".to_string(),
141            ));
142        }
143
144        // Attempt to read the authentication key from the environment variable if not provided
145        let auth_key = self
146            .auth_key
147            .clone()
148            .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
149
150        // Determine the URLs based on the TLS setting
151        let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
152            info!("Using non-secure connection: ws:// and http://");
153            let tycho_ws_url = format!("ws://{}", self.tycho_url);
154            let tycho_rpc_url = format!("http://{}", self.tycho_url);
155            (tycho_ws_url, tycho_rpc_url)
156        } else {
157            info!("Using secure connection: wss:// and https://");
158            let tycho_ws_url = format!("wss://{}", self.tycho_url);
159            let tycho_rpc_url = format!("https://{}", self.tycho_url);
160            (tycho_ws_url, tycho_rpc_url)
161        };
162
163        // Initialize the WebSocket client
164        let ws_client = WsDeltasClient::new(&tycho_ws_url, auth_key.as_deref())
165            .map_err(|e| StreamError::SetUpError(e.to_string()))?;
166        let rpc_client = HttpRPCClient::new(&tycho_rpc_url, auth_key.as_deref())
167            .map_err(|e| StreamError::SetUpError(e.to_string()))?;
168        let ws_jh = ws_client
169            .connect()
170            .await
171            .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
172
173        // Create and configure the BlockSynchronizer
174        let mut block_sync = BlockSynchronizer::new(
175            Duration::from_secs(self.block_time),
176            Duration::from_secs(self.timeout),
177            self.max_missed_blocks,
178        );
179
180        self.display_available_protocols(&rpc_client)
181            .await;
182
183        // Register each exchange with the BlockSynchronizer
184        for (name, filter) in self.exchanges {
185            info!("Registering exchange: {}", name);
186            let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
187            let sync = ProtocolStateSynchronizer::new(
188                id.clone(),
189                true,
190                filter,
191                3,
192                !self.no_state,
193                self.include_tvl,
194                rpc_client.clone(),
195                ws_client.clone(),
196                self.block_time + self.timeout,
197            );
198            block_sync = block_sync.register_synchronizer(id, sync);
199        }
200
201        // Start the BlockSynchronizer and monitor for disconnections
202        let (sync_jh, rx) = block_sync
203            .run()
204            .await
205            .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
206
207        // Monitor WebSocket and BlockSynchronizer futures
208        let handle = tokio::spawn(async move {
209            tokio::select! {
210                res = ws_jh => {
211                    let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
212                }
213                res = sync_jh => {
214                    res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
215                }
216            }
217        });
218
219        Ok((handle, rx))
220    }
221
222    /// Displays the other available protocols not registered to within this stream builder, for the
223    /// given chain.
224    async fn display_available_protocols(&self, rpc_client: &HttpRPCClient) {
225        let available_protocols_set = rpc_client
226            .get_protocol_systems(&ProtocolSystemsRequestBody {
227                chain: self.chain,
228                pagination: PaginationParams { page: 0, page_size: 100 },
229            })
230            .await
231            .map(|resp| {
232                resp.protocol_systems
233                    .into_iter()
234                    .collect::<HashSet<_>>()
235            })
236            .map_err(|e| {
237                warn!(
238                    "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
239                );
240                e
241            })
242            .ok();
243
244        if let Some(not_requested_protocols) = available_protocols_set
245            .map(|available_protocols_set| {
246                let requested_protocol_set = self
247                    .exchanges
248                    .keys()
249                    .cloned()
250                    .collect::<HashSet<_>>();
251
252                available_protocols_set
253                    .difference(&requested_protocol_set)
254                    .cloned()
255                    .collect::<Vec<_>>()
256            })
257            .filter(|not_requested_protocols| !not_requested_protocols.is_empty())
258        {
259            info!("Other available protocols: {}", not_requested_protocols.join(", "))
260        }
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    #[tokio::test]
269    async fn test_no_exchanges() {
270        let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
271            .auth_key(Some("my_api_key".into()))
272            .build()
273            .await;
274        assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
275    }
276
277    #[ignore = "require tycho gateway"]
278    #[tokio::test]
279    async fn teat_simple_build() {
280        let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
281        let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
282            .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
283            .auth_key(Some(token))
284            .build()
285            .await;
286
287        dbg!(&receiver);
288
289        assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
290    }
291}