tycho_client/
stream.rs

1use std::{
2    collections::{HashMap, HashSet},
3    env,
4    time::Duration,
5};
6
7use thiserror::Error;
8use tokio::{sync::mpsc::Receiver, task::JoinHandle};
9use tracing::{info, warn};
10use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
11
12use crate::{
13    deltas::DeltasClient,
14    feed::{
15        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
16        BlockSynchronizer, FeedMessage,
17    },
18    rpc::RPCClient,
19    HttpRPCClient, WsDeltasClient,
20};
21
22#[derive(Error, Debug)]
23pub enum StreamError {
24    #[error("Error during stream set up: {0}")]
25    SetUpError(String),
26
27    #[error("WebSocket client connection error: {0}")]
28    WebSocketConnectionError(String),
29
30    #[error("BlockSynchronizer error: {0}")]
31    BlockSynchronizerError(String),
32}
33
34pub struct TychoStreamBuilder {
35    tycho_url: String,
36    chain: Chain,
37    exchanges: HashMap<String, ComponentFilter>,
38    block_time: u64,
39    timeout: u64,
40    max_missed_blocks: u64,
41    no_state: bool,
42    auth_key: Option<String>,
43    no_tls: bool,
44    include_tvl: bool,
45}
46
47impl TychoStreamBuilder {
48    /// Creates a new `TychoStreamBuilder` with the given Tycho URL and blockchain network.
49    /// Initializes the builder with default values for block time and timeout based on the chain.
50    pub fn new(tycho_url: &str, chain: Chain) -> Self {
51        let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
52        Self {
53            tycho_url: tycho_url.to_string(),
54            chain,
55            exchanges: HashMap::new(),
56            block_time,
57            timeout,
58            max_missed_blocks,
59            no_state: false,
60            auth_key: None,
61            no_tls: true,
62            include_tvl: false,
63        }
64    }
65
66    /// Returns the default block_time, timeout and max_missed_blocks values for the given
67    /// blockchain network.
68    fn default_timing(chain: &Chain) -> (u64, u64, u64) {
69        match chain {
70            Chain::Ethereum => (12, 36, 10),
71            Chain::Starknet => (2, 8, 50),
72            Chain::ZkSync => (3, 12, 50),
73            Chain::Arbitrum => (1, 2, 100), // Typically closer to 0.25s
74            Chain::Base => (2, 12, 50),
75            Chain::Unichain => (1, 10, 100),
76        }
77    }
78
79    /// Adds an exchange and its corresponding filter to the Tycho client.
80    pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
81        self.exchanges
82            .insert(name.to_string(), filter);
83        self
84    }
85
86    /// Sets the block time for the Tycho client.
87    pub fn block_time(mut self, block_time: u64) -> Self {
88        self.block_time = block_time;
89        self
90    }
91
92    /// Sets the timeout duration for network operations.
93    pub fn timeout(mut self, timeout: u64) -> Self {
94        self.timeout = timeout;
95        self
96    }
97
98    pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
99        self.max_missed_blocks = max_missed_blocks;
100        self
101    }
102
103    /// Configures the client to exclude state updates from the stream.
104    pub fn no_state(mut self, no_state: bool) -> Self {
105        self.no_state = no_state;
106        self
107    }
108
109    /// Sets the API key for authenticating with the Tycho server.
110    ///
111    /// Optionally you can set the TYCHO_AUTH_TOKEN env var instead. Make sure to set no_tsl
112    /// to false if you do this.
113    pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
114        self.auth_key = auth_key;
115        self.no_tls = false;
116        self
117    }
118
119    /// Disables TLS/SSL for the connection, using `http` and `ws` protocols.
120    pub fn no_tls(mut self, no_tls: bool) -> Self {
121        self.no_tls = no_tls;
122        self
123    }
124
125    /// Configures the client to include TVL in the stream.
126    ///
127    /// If set to true, this will increase start-up time due to additional requests.
128    pub fn include_tvl(mut self, include_tvl: bool) -> Self {
129        self.include_tvl = include_tvl;
130        self
131    }
132
133    /// Builds and starts the Tycho client, connecting to the Tycho server and
134    /// setting up the synchronization of exchange components.
135    pub async fn build(self) -> Result<(JoinHandle<()>, Receiver<FeedMessage>), StreamError> {
136        if self.exchanges.is_empty() {
137            return Err(StreamError::SetUpError(
138                "At least one exchange must be registered.".to_string(),
139            ));
140        }
141
142        // Attempt to read the authentication key from the environment variable if not provided
143        let auth_key = self
144            .auth_key
145            .clone()
146            .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
147
148        // Determine the URLs based on the TLS setting
149        let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
150            info!("Using non-secure connection: ws:// and http://");
151            let tycho_ws_url = format!("ws://{}", self.tycho_url);
152            let tycho_rpc_url = format!("http://{}", self.tycho_url);
153            (tycho_ws_url, tycho_rpc_url)
154        } else {
155            info!("Using secure connection: wss:// and https://");
156            let tycho_ws_url = format!("wss://{}", self.tycho_url);
157            let tycho_rpc_url = format!("https://{}", self.tycho_url);
158            (tycho_ws_url, tycho_rpc_url)
159        };
160
161        // Initialize the WebSocket client
162        let ws_client = WsDeltasClient::new(&tycho_ws_url, auth_key.as_deref())
163            .map_err(|e| StreamError::SetUpError(e.to_string()))?;
164        let rpc_client = HttpRPCClient::new(&tycho_rpc_url, auth_key.as_deref())
165            .map_err(|e| StreamError::SetUpError(e.to_string()))?;
166        let ws_jh = ws_client
167            .connect()
168            .await
169            .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
170
171        // Create and configure the BlockSynchronizer
172        let mut block_sync = BlockSynchronizer::new(
173            Duration::from_secs(self.block_time),
174            Duration::from_secs(self.timeout),
175            self.max_missed_blocks,
176        );
177
178        self.display_available_protocols(&rpc_client)
179            .await;
180
181        // Register each exchange with the BlockSynchronizer
182        for (name, filter) in self.exchanges {
183            info!("Registering exchange: {}", name);
184            let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
185            let sync = ProtocolStateSynchronizer::new(
186                id.clone(),
187                true,
188                filter,
189                3,
190                !self.no_state,
191                self.include_tvl,
192                rpc_client.clone(),
193                ws_client.clone(),
194                self.block_time + self.timeout,
195            );
196            block_sync = block_sync.register_synchronizer(id, sync);
197        }
198
199        // Start the BlockSynchronizer and monitor for disconnections
200        let (sync_jh, rx) = block_sync
201            .run()
202            .await
203            .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
204
205        // Monitor WebSocket and BlockSynchronizer futures
206        let handle = tokio::spawn(async move {
207            tokio::select! {
208                res = ws_jh => {
209                    let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
210                }
211                res = sync_jh => {
212                    res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
213                }
214            }
215        });
216
217        Ok((handle, rx))
218    }
219
220    /// Displays the other available protocols not registered to within this stream builder, for the
221    /// given chain.
222    async fn display_available_protocols(&self, rpc_client: &HttpRPCClient) {
223        let available_protocols_set = rpc_client
224            .get_protocol_systems(&ProtocolSystemsRequestBody {
225                chain: self.chain,
226                pagination: PaginationParams { page: 0, page_size: 100 },
227            })
228            .await
229            .map(|resp| {
230                resp.protocol_systems
231                    .into_iter()
232                    .collect::<HashSet<_>>()
233            })
234            .map_err(|e| {
235                warn!(
236                    "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
237                );
238                e
239            })
240            .ok();
241
242        if let Some(not_requested_protocols) = available_protocols_set
243            .map(|available_protocols_set| {
244                let requested_protocol_set = self
245                    .exchanges
246                    .keys()
247                    .cloned()
248                    .collect::<HashSet<_>>();
249
250                available_protocols_set
251                    .difference(&requested_protocol_set)
252                    .cloned()
253                    .collect::<Vec<_>>()
254            })
255            .filter(|not_requested_protocols| !not_requested_protocols.is_empty())
256        {
257            info!("Other available protocols: {}", not_requested_protocols.join(", "))
258        }
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    #[tokio::test]
267    async fn test_no_exchanges() {
268        let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
269            .auth_key(Some("my_api_key".into()))
270            .build()
271            .await;
272        assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
273    }
274
275    #[ignore = "require tycho gateway"]
276    #[tokio::test]
277    async fn teat_simple_build() {
278        let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
279        let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
280            .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
281            .auth_key(Some(token))
282            .build()
283            .await;
284
285        dbg!(&receiver);
286
287        assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
288    }
289}