1use std::{
2 cmp::max,
3 collections::{HashMap, HashSet},
4 env,
5 time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
12
13use crate::{
14 deltas::DeltasClient,
15 feed::{
16 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
17 BlockSynchronizer, BlockSynchronizerError, FeedMessage,
18 },
19 rpc::{HttpRPCClientOptions, RPCClient},
20 HttpRPCClient, WsDeltasClient,
21};
22
23#[derive(Error, Debug)]
24pub enum StreamError {
25 #[error("Error during stream set up: {0}")]
26 SetUpError(String),
27
28 #[error("WebSocket client connection error: {0}")]
29 WebSocketConnectionError(String),
30
31 #[error("BlockSynchronizer error: {0}")]
32 BlockSynchronizerError(String),
33}
34
35#[non_exhaustive]
36#[derive(Clone, Debug)]
37pub enum RetryConfiguration {
38 Constant(ConstantRetryConfiguration),
39}
40
41impl RetryConfiguration {
42 pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
43 RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
44 }
45}
46
47#[derive(Clone, Debug)]
48pub struct ConstantRetryConfiguration {
49 max_attempts: u64,
50 cooldown: Duration,
51}
52
53pub struct TychoStreamBuilder {
54 tycho_url: String,
55 chain: Chain,
56 exchanges: HashMap<String, ComponentFilter>,
57 block_time: u64,
58 timeout: u64,
59 startup_timeout: Duration,
60 max_missed_blocks: u64,
61 state_sync_retry_config: RetryConfiguration,
62 websockets_retry_config: RetryConfiguration,
63 no_state: bool,
64 auth_key: Option<String>,
65 no_tls: bool,
66 include_tvl: bool,
67 compression: bool,
68}
69
70impl TychoStreamBuilder {
71 pub fn new(tycho_url: &str, chain: Chain) -> Self {
74 let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
75 Self {
76 tycho_url: tycho_url.to_string(),
77 chain,
78 exchanges: HashMap::new(),
79 block_time,
80 timeout,
81 startup_timeout: Duration::from_secs(block_time * max_missed_blocks),
82 max_missed_blocks,
83 state_sync_retry_config: RetryConfiguration::constant(
84 32,
85 Duration::from_secs(max(block_time / 4, 2)),
86 ),
87 websockets_retry_config: RetryConfiguration::constant(
88 128,
89 Duration::from_secs(max(block_time / 6, 1)),
90 ),
91 no_state: false,
92 auth_key: None,
93 no_tls: true,
94 include_tvl: false,
95 compression: true,
96 }
97 }
98
99 fn default_timing(chain: &Chain) -> (u64, u64, u64) {
102 match chain {
103 Chain::Ethereum => (12, 36, 50),
104 Chain::Starknet => (2, 8, 50),
105 Chain::ZkSync => (3, 12, 50),
106 Chain::Arbitrum => (1, 2, 100), Chain::Base => (2, 12, 50),
108 Chain::Bsc => (1, 12, 50),
109 Chain::Unichain => (1, 10, 100),
110 }
111 }
112
113 pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
115 self.exchanges
116 .insert(name.to_string(), filter);
117 self
118 }
119
120 pub fn block_time(mut self, block_time: u64) -> Self {
122 self.block_time = block_time;
123 self
124 }
125
126 pub fn timeout(mut self, timeout: u64) -> Self {
128 self.timeout = timeout;
129 self
130 }
131
132 pub fn startup_timeout(mut self, timeout: Duration) -> Self {
133 self.startup_timeout = timeout;
134 self
135 }
136
137 pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
138 self.max_missed_blocks = max_missed_blocks;
139 self
140 }
141
142 pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
143 self.websockets_retry_config = retry_config.clone();
144 self.warn_on_potential_timing_issues();
145 self
146 }
147
148 pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
149 self.state_sync_retry_config = retry_config.clone();
150 self.warn_on_potential_timing_issues();
151 self
152 }
153
154 fn warn_on_potential_timing_issues(&self) {
155 let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
156 (&self.state_sync_retry_config, &self.websockets_retry_config);
157
158 if ws_config.cooldown >= state_config.cooldown {
159 warn!(
160 "Websocket cooldown should be < than state syncronizer cooldown \
161 to avoid spending retries due to disconnected websocket."
162 )
163 }
164 }
165
166 pub fn no_state(mut self, no_state: bool) -> Self {
168 self.no_state = no_state;
169 self
170 }
171
172 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
177 self.auth_key = auth_key;
178 self.no_tls = false;
179 self
180 }
181
182 pub fn no_tls(mut self, no_tls: bool) -> Self {
184 self.no_tls = no_tls;
185 self
186 }
187
188 pub fn include_tvl(mut self, include_tvl: bool) -> Self {
192 self.include_tvl = include_tvl;
193 self
194 }
195
196 pub fn disable_compression(mut self) -> Self {
199 self.compression = false;
200 self
201 }
202
203 pub async fn build(
206 self,
207 ) -> Result<
208 (JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
209 StreamError,
210 > {
211 if self.exchanges.is_empty() {
212 return Err(StreamError::SetUpError(
213 "At least one exchange must be registered.".to_string(),
214 ));
215 }
216
217 let auth_key = self
219 .auth_key
220 .clone()
221 .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
222
223 info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
224
225 let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
227 info!("Using non-secure connection: ws:// and http://");
228 let tycho_ws_url = format!("ws://{}", self.tycho_url);
229 let tycho_rpc_url = format!("http://{}", self.tycho_url);
230 (tycho_ws_url, tycho_rpc_url)
231 } else {
232 info!("Using secure connection: wss:// and https://");
233 let tycho_ws_url = format!("wss://{}", self.tycho_url);
234 let tycho_rpc_url = format!("https://{}", self.tycho_url);
235 (tycho_ws_url, tycho_rpc_url)
236 };
237
238 let ws_client = match &self.websockets_retry_config {
240 RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
241 &tycho_ws_url,
242 auth_key.as_deref(),
243 config.max_attempts,
244 config.cooldown,
245 ),
246 }
247 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
248 let rpc_client = HttpRPCClient::new(
249 &tycho_rpc_url,
250 HttpRPCClientOptions::new()
251 .with_auth_key(auth_key)
252 .with_compression(self.compression),
253 )
254 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
255 let ws_jh = ws_client
256 .connect()
257 .await
258 .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
259
260 let mut block_sync = BlockSynchronizer::new(
262 Duration::from_secs(self.block_time),
263 Duration::from_secs(self.timeout),
264 self.max_missed_blocks,
265 );
266
267 self.display_available_protocols(&rpc_client)
268 .await;
269
270 for (name, filter) in self.exchanges {
272 info!("Registering exchange: {}", name);
273 let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
274 let sync = match &self.state_sync_retry_config {
275 RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
276 id.clone(),
277 true,
278 filter,
279 retry_config.max_attempts,
280 retry_config.cooldown,
281 !self.no_state,
282 self.include_tvl,
283 self.compression,
284 rpc_client.clone(),
285 ws_client.clone(),
286 self.block_time + self.timeout,
287 ),
288 };
289 block_sync = block_sync.register_synchronizer(id, sync);
290 }
291
292 let (sync_jh, rx) = block_sync
294 .run()
295 .await
296 .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
297
298 let handle = tokio::spawn(async move {
300 tokio::select! {
301 res = ws_jh => {
302 let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
303 }
304 res = sync_jh => {
305 res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
306 }
307 }
308 if let Err(e) = ws_client.close().await {
309 warn!(?e, "Failed to close WebSocket client");
310 }
311 });
312
313 Ok((handle, rx))
314 }
315
316 async fn display_available_protocols(&self, rpc_client: &HttpRPCClient) {
319 let available_protocols_set = rpc_client
320 .get_protocol_systems(&ProtocolSystemsRequestBody {
321 chain: self.chain,
322 pagination: PaginationParams { page: 0, page_size: 100 },
323 })
324 .await
325 .map(|resp| {
326 resp.protocol_systems
327 .into_iter()
328 .collect::<HashSet<_>>()
329 })
330 .map_err(|e| {
331 warn!(
332 "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
333 );
334 e
335 })
336 .ok();
337
338 if let Some(not_requested_protocols) = available_protocols_set
339 .map(|available_protocols_set| {
340 let requested_protocol_set = self
341 .exchanges
342 .keys()
343 .cloned()
344 .collect::<HashSet<_>>();
345
346 available_protocols_set
347 .difference(&requested_protocol_set)
348 .cloned()
349 .collect::<Vec<_>>()
350 })
351 .filter(|not_requested_protocols| !not_requested_protocols.is_empty())
352 {
353 info!("Other available protocols: {}", not_requested_protocols.join(", "))
354 }
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361
362 #[test]
363 fn test_retry_configuration_constant() {
364 let config = RetryConfiguration::constant(5, Duration::from_secs(10));
365 match config {
366 RetryConfiguration::Constant(c) => {
367 assert_eq!(c.max_attempts, 5);
368 assert_eq!(c.cooldown, Duration::from_secs(10));
369 }
370 }
371 }
372
373 #[test]
374 fn test_stream_builder_retry_configs() {
375 let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
376 let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
377 let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
378
379 builder = builder
380 .websockets_retry_config(&ws_config)
381 .state_synchronizer_retry_config(&state_config);
382
383 match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
385 (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
386 assert_eq!(ws.max_attempts, 10);
387 assert_eq!(ws.cooldown, Duration::from_secs(2));
388 assert_eq!(state.max_attempts, 20);
389 assert_eq!(state.cooldown, Duration::from_secs(5));
390 }
391 }
392 }
393
394 #[test]
395 fn test_default_compression() {
396 let builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
397 assert!(builder.compression, "Compression should be enabled by default.");
398 }
399
400 #[tokio::test]
401 async fn test_no_exchanges() {
402 let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
403 .auth_key(Some("my_api_key".into()))
404 .build()
405 .await;
406 assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
407 }
408
409 #[ignore = "require tycho gateway"]
410 #[tokio::test]
411 async fn test_simple_build() {
412 let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
413 let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
414 .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
415 .auth_key(Some(token))
416 .build()
417 .await;
418
419 dbg!(&receiver);
420
421 assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
422 }
423}