1use std::{
2 cmp::max,
3 collections::{HashMap, HashSet},
4 env,
5 time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::dto::{
12 Chain, ExtractorIdentity, PaginationLimits, PaginationParams, ProtocolSystemsRequestBody,
13};
14
15use crate::{
16 deltas::DeltasClient,
17 feed::{
18 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
19 BlockSynchronizer, BlockSynchronizerError, FeedMessage,
20 },
21 rpc::{HttpRPCClientOptions, RPCClient},
22 HttpRPCClient, WsDeltasClient,
23};
24
25#[derive(Error, Debug)]
26pub enum StreamError {
27 #[error("Error during stream set up: {0}")]
28 SetUpError(String),
29
30 #[error("WebSocket client connection error: {0}")]
31 WebSocketConnectionError(String),
32
33 #[error("BlockSynchronizer error: {0}")]
34 BlockSynchronizerError(String),
35}
36
37#[non_exhaustive]
38#[derive(Clone, Debug)]
39pub enum RetryConfiguration {
40 Constant(ConstantRetryConfiguration),
41}
42
43impl RetryConfiguration {
44 pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
45 RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
46 }
47}
48
49#[derive(Clone, Debug)]
50pub struct ConstantRetryConfiguration {
51 max_attempts: u64,
52 cooldown: Duration,
53}
54
55pub struct TychoStreamBuilder {
56 tycho_url: String,
57 chain: Chain,
58 exchanges: HashMap<String, ComponentFilter>,
59 blocklisted_ids: HashSet<String>,
60 block_time: u64,
61 timeout: u64,
62 startup_timeout: Duration,
63 max_missed_blocks: u64,
64 state_sync_retry_config: RetryConfiguration,
65 websockets_retry_config: RetryConfiguration,
66 no_state: bool,
67 auth_key: Option<String>,
68 no_tls: bool,
69 include_tvl: bool,
70 compression: bool,
71 partial_blocks: bool,
72}
73
74impl TychoStreamBuilder {
75 pub fn new(tycho_url: &str, chain: Chain) -> Self {
78 let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
79 Self {
80 tycho_url: tycho_url.to_string(),
81 chain,
82 exchanges: HashMap::new(),
83 blocklisted_ids: HashSet::new(),
84 block_time,
85 timeout,
86 startup_timeout: Duration::from_secs(block_time * max_missed_blocks),
87 max_missed_blocks,
88 state_sync_retry_config: RetryConfiguration::constant(
89 32,
90 Duration::from_secs(max(block_time / 4, 2)),
91 ),
92 websockets_retry_config: RetryConfiguration::constant(
93 128,
94 Duration::from_secs(max(block_time / 6, 1)),
95 ),
96 no_state: false,
97 auth_key: None,
98 no_tls: true,
99 include_tvl: false,
100 compression: true,
101 partial_blocks: false,
102 }
103 }
104
105 fn default_timing(chain: &Chain) -> (u64, u64, u64) {
108 match chain {
109 Chain::Ethereum => (12, 36, 50),
110 Chain::Starknet => (2, 8, 50),
111 Chain::ZkSync => (3, 12, 50),
112 Chain::Arbitrum => (1, 2, 100), Chain::Base => (2, 12, 50),
114 Chain::Bsc => (1, 12, 50),
115 Chain::Unichain => (1, 10, 100),
116 Chain::Polygon => (2, 12, 50), }
118 }
119
120 pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
122 self.exchanges
123 .insert(name.to_string(), filter);
124 self
125 }
126
127 pub fn block_time(mut self, block_time: u64) -> Self {
129 self.block_time = block_time;
130 self
131 }
132
133 pub fn timeout(mut self, timeout: u64) -> Self {
135 self.timeout = timeout;
136 self
137 }
138
139 pub fn startup_timeout(mut self, timeout: Duration) -> Self {
140 self.startup_timeout = timeout;
141 self
142 }
143
144 pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
145 self.max_missed_blocks = max_missed_blocks;
146 self
147 }
148
149 pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
150 self.websockets_retry_config = retry_config.clone();
151 self.warn_on_potential_timing_issues();
152 self
153 }
154
155 pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
156 self.state_sync_retry_config = retry_config.clone();
157 self.warn_on_potential_timing_issues();
158 self
159 }
160
161 fn warn_on_potential_timing_issues(&self) {
162 let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
163 (&self.state_sync_retry_config, &self.websockets_retry_config);
164
165 if ws_config.cooldown >= state_config.cooldown {
166 warn!(
167 "Websocket cooldown should be < than state syncronizer cooldown \
168 to avoid spending retries due to disconnected websocket."
169 )
170 }
171 }
172
173 pub fn no_state(mut self, no_state: bool) -> Self {
175 self.no_state = no_state;
176 self
177 }
178
179 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
184 self.auth_key = auth_key;
185 self.no_tls = false;
186 self
187 }
188
189 pub fn no_tls(mut self, no_tls: bool) -> Self {
191 self.no_tls = no_tls;
192 self
193 }
194
195 pub fn include_tvl(mut self, include_tvl: bool) -> Self {
199 self.include_tvl = include_tvl;
200 self
201 }
202
203 pub fn disable_compression(mut self) -> Self {
206 self.compression = false;
207 self
208 }
209
210 pub fn enable_partial_blocks(mut self) -> Self {
212 self.partial_blocks = true;
213 self
214 }
215
216 pub fn blocklisted_ids(mut self, ids: impl IntoIterator<Item = String>) -> Self {
221 self.blocklisted_ids.extend(ids);
222 self
223 }
224
225 pub async fn build(
228 self,
229 ) -> Result<
230 (JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
231 StreamError,
232 > {
233 if self.exchanges.is_empty() {
234 return Err(StreamError::SetUpError(
235 "At least one exchange must be registered.".to_string(),
236 ));
237 }
238
239 let auth_key = self
241 .auth_key
242 .clone()
243 .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
244
245 info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
246
247 let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
249 info!("Using non-secure connection: ws:// and http://");
250 let tycho_ws_url = format!("ws://{}", self.tycho_url);
251 let tycho_rpc_url = format!("http://{}", self.tycho_url);
252 (tycho_ws_url, tycho_rpc_url)
253 } else {
254 info!("Using secure connection: wss:// and https://");
255 let tycho_ws_url = format!("wss://{}", self.tycho_url);
256 let tycho_rpc_url = format!("https://{}", self.tycho_url);
257 (tycho_ws_url, tycho_rpc_url)
258 };
259
260 let ws_client = match &self.websockets_retry_config {
262 RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
263 &tycho_ws_url,
264 auth_key.as_deref(),
265 config.max_attempts,
266 config.cooldown,
267 ),
268 }
269 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
270 let rpc_client = HttpRPCClient::new(
271 &tycho_rpc_url,
272 HttpRPCClientOptions::new()
273 .with_auth_key(auth_key)
274 .with_compression(self.compression),
275 )
276 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
277 let ws_jh = ws_client
278 .connect()
279 .await
280 .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
281
282 let mut block_sync = BlockSynchronizer::new(
284 Duration::from_secs(self.block_time),
285 Duration::from_secs(self.timeout),
286 self.max_missed_blocks,
287 );
288
289 let requested: HashSet<_> = self.exchanges.keys().cloned().collect();
290 let info = ProtocolSystemsInfo::fetch(&rpc_client, self.chain, &requested).await;
291 info.log_other_available();
292 let dci_protocols = info.dci_protocols;
293
294 for (name, filter) in self
296 .exchanges
297 .into_iter()
298 .map(|(name, filter)| {
299 let filter = if self.blocklisted_ids.is_empty() {
300 filter
301 } else {
302 filter.blocklist(self.blocklisted_ids.iter().cloned())
303 };
304 (name, filter)
305 })
306 {
307 info!("Registering exchange: {}", name);
308 let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
309 let uses_dci = dci_protocols.contains(&name);
310 let sync = match &self.state_sync_retry_config {
311 RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
312 id.clone(),
313 true,
314 filter,
315 retry_config.max_attempts,
316 retry_config.cooldown,
317 !self.no_state,
318 self.include_tvl,
319 self.compression,
320 rpc_client.clone(),
321 ws_client.clone(),
322 self.block_time + self.timeout,
323 )
324 .with_dci(uses_dci)
325 .with_partial_blocks(self.partial_blocks),
326 };
327 block_sync = block_sync.register_synchronizer(id, sync);
328 }
329
330 let (sync_jh, rx) = block_sync
332 .run()
333 .await
334 .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
335
336 let handle = tokio::spawn(async move {
338 tokio::select! {
339 res = ws_jh => {
340 let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
341 }
342 res = sync_jh => {
343 res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
344 }
345 }
346 if let Err(e) = ws_client.close().await {
347 warn!(?e, "Failed to close WebSocket client");
348 }
349 });
350
351 Ok((handle, rx))
352 }
353}
354
355pub struct ProtocolSystemsInfo {
358 pub dci_protocols: HashSet<String>,
359 pub other_available: HashSet<String>,
360}
361
362impl ProtocolSystemsInfo {
363 pub async fn fetch(
366 rpc_client: &HttpRPCClient,
367 chain: Chain,
368 requested_exchanges: &HashSet<String>,
369 ) -> Self {
370 let page_size =
371 ProtocolSystemsRequestBody::effective_max_page_size(rpc_client.compression());
372 let response = rpc_client
373 .get_protocol_systems(&ProtocolSystemsRequestBody {
374 chain,
375 pagination: PaginationParams { page: 0, page_size },
376 })
377 .await
378 .map_err(|e| {
379 warn!(
380 "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
381 );
382 e
383 })
384 .ok();
385
386 let Some(response) = response else {
387 return Self { dci_protocols: HashSet::new(), other_available: HashSet::new() };
388 };
389
390 if response.pagination.total > page_size {
391 warn!(
392 "Server has {} protocol systems but only {} were fetched (page_size={page_size}). \
393 Availability info may be incomplete.",
394 response.pagination.total,
395 response.protocol_systems.len(),
396 );
397 }
398
399 let available: HashSet<_> = response
400 .protocol_systems
401 .into_iter()
402 .collect();
403 let other_available = available
404 .difference(requested_exchanges)
405 .cloned()
406 .collect();
407 let mut dci_protocols: HashSet<String> = response
408 .dci_protocols
409 .into_iter()
410 .collect();
411
412 if dci_protocols.is_empty() {
417 const LEGACY_DCI: &[&str] = &[
418 "uniswap_v4_hooks",
419 "vm:curve",
420 "vm:balancer_v2",
421 "vm:balancer_v3",
422 "fluid_v1",
423 "erc4626",
424 ];
425 for name in requested_exchanges {
426 if LEGACY_DCI.contains(&name.as_str()) {
427 dci_protocols.insert(name.clone());
428 }
429 }
430 }
431
432 Self { dci_protocols, other_available }
433 }
434
435 pub fn log_other_available(&self) {
437 if !self.other_available.is_empty() {
438 let names: Vec<_> = self
439 .other_available
440 .iter()
441 .cloned()
442 .collect();
443 info!("Other available protocols: {}", names.join(", "));
444 }
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451
452 #[test]
453 fn test_retry_configuration_constant() {
454 let config = RetryConfiguration::constant(5, Duration::from_secs(10));
455 match config {
456 RetryConfiguration::Constant(c) => {
457 assert_eq!(c.max_attempts, 5);
458 assert_eq!(c.cooldown, Duration::from_secs(10));
459 }
460 }
461 }
462
463 #[test]
464 fn test_stream_builder_retry_configs() {
465 let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
466 let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
467 let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
468
469 builder = builder
470 .websockets_retry_config(&ws_config)
471 .state_synchronizer_retry_config(&state_config);
472
473 match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
475 (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
476 assert_eq!(ws.max_attempts, 10);
477 assert_eq!(ws.cooldown, Duration::from_secs(2));
478 assert_eq!(state.max_attempts, 20);
479 assert_eq!(state.cooldown, Duration::from_secs(5));
480 }
481 }
482 }
483
484 #[test]
485 fn test_default_stream_builder() {
486 let builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
487 assert!(builder.compression, "Compression should be enabled by default.");
488 assert!(!builder.partial_blocks, "partial_blocks should be disabled by default.");
489 }
490
491 #[tokio::test]
492 async fn test_no_exchanges() {
493 let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
494 .auth_key(Some("my_api_key".into()))
495 .build()
496 .await;
497 assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
498 }
499
500 #[ignore = "require tycho gateway"]
501 #[tokio::test]
502 async fn test_simple_build() {
503 let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
504 let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
505 .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
506 .auth_key(Some(token))
507 .build()
508 .await;
509
510 dbg!(&receiver);
511
512 assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
513 }
514}