1use std::{
2 cmp::max,
3 collections::{HashMap, HashSet},
4 env,
5 time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::{
12 dto::{PaginationLimits, ProtocolSystemsRequestBody},
13 models::{Chain, ExtractorIdentity},
14};
15
16use crate::{
17 deltas::DeltasClient,
18 feed::{
19 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
20 BlockSynchronizer, BlockSynchronizerError, FeedMessage,
21 },
22 rpc::{HttpRPCClientOptions, ProtocolSystemsParams, RPCClient},
23 HttpRPCClient, WsDeltasClient,
24};
25
26#[derive(Error, Debug)]
27pub enum StreamError {
28 #[error("Error during stream set up: {0}")]
29 SetUpError(String),
30
31 #[error("WebSocket client connection error: {0}")]
32 WebSocketConnectionError(String),
33
34 #[error("BlockSynchronizer error: {0}")]
35 BlockSynchronizerError(String),
36}
37
38#[non_exhaustive]
39#[derive(Clone, Debug)]
40pub enum RetryConfiguration {
41 Constant(ConstantRetryConfiguration),
42}
43
44impl RetryConfiguration {
45 pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
46 RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
47 }
48}
49
50#[derive(Clone, Debug)]
51pub struct ConstantRetryConfiguration {
52 max_attempts: u64,
53 cooldown: Duration,
54}
55
56pub struct TychoStreamBuilder {
57 tycho_url: String,
58 chain: Chain,
59 exchanges: HashMap<String, ComponentFilter>,
60 blocklisted_ids: HashSet<String>,
61 block_time: u64,
62 timeout: u64,
63 startup_timeout: Duration,
64 max_missed_blocks: u64,
65 state_sync_retry_config: RetryConfiguration,
66 websockets_retry_config: RetryConfiguration,
67 no_state: bool,
68 auth_key: Option<String>,
69 no_tls: bool,
70 include_tvl: bool,
71 compression: bool,
72 partial_blocks: bool,
73 max_messages: Option<usize>,
74}
75
76impl TychoStreamBuilder {
77 pub fn new(tycho_url: &str, chain: Chain) -> Self {
80 let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
81 Self {
82 tycho_url: tycho_url.to_string(),
83 chain,
84 exchanges: HashMap::new(),
85 blocklisted_ids: HashSet::new(),
86 block_time,
87 timeout,
88 startup_timeout: Duration::from_secs(block_time * max_missed_blocks),
89 max_missed_blocks,
90 state_sync_retry_config: RetryConfiguration::constant(
91 32,
92 Duration::from_secs(max(block_time / 4, 2)),
93 ),
94 websockets_retry_config: RetryConfiguration::constant(
95 128,
96 Duration::from_secs(max(block_time / 6, 1)),
97 ),
98 no_state: false,
99 auth_key: None,
100 no_tls: true,
101 include_tvl: false,
102 compression: true,
103 partial_blocks: false,
104 max_messages: None,
105 }
106 }
107
108 fn default_timing(chain: &Chain) -> (u64, u64, u64) {
111 match chain {
112 Chain::Ethereum => (12, 36, 50),
113 Chain::Starknet => (2, 8, 50),
114 Chain::ZkSync => (3, 12, 50),
115 Chain::Arbitrum => (1, 2, 100), Chain::Base => (2, 12, 50),
117 Chain::Bsc => (1, 12, 50),
118 Chain::Unichain => (1, 10, 100),
119 Chain::Polygon => (2, 12, 50), }
121 }
122
123 pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
125 self.exchanges
126 .insert(name.to_string(), filter);
127 self
128 }
129
130 pub fn block_time(mut self, block_time: u64) -> Self {
132 self.block_time = block_time;
133 self
134 }
135
136 pub fn timeout(mut self, timeout: u64) -> Self {
138 self.timeout = timeout;
139 self
140 }
141
142 pub fn startup_timeout(mut self, timeout: Duration) -> Self {
143 self.startup_timeout = timeout;
144 self
145 }
146
147 pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
148 self.max_missed_blocks = max_missed_blocks;
149 self
150 }
151
152 pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
153 self.websockets_retry_config = retry_config.clone();
154 self.warn_on_potential_timing_issues();
155 self
156 }
157
158 pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
159 self.state_sync_retry_config = retry_config.clone();
160 self.warn_on_potential_timing_issues();
161 self
162 }
163
164 fn warn_on_potential_timing_issues(&self) {
165 let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
166 (&self.state_sync_retry_config, &self.websockets_retry_config);
167
168 if ws_config.cooldown >= state_config.cooldown {
169 warn!(
170 "Websocket cooldown should be < than state syncronizer cooldown \
171 to avoid spending retries due to disconnected websocket."
172 )
173 }
174 }
175
176 pub fn no_state(mut self, no_state: bool) -> Self {
178 self.no_state = no_state;
179 self
180 }
181
182 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
187 self.auth_key = auth_key;
188 self.no_tls = false;
189 self
190 }
191
192 pub fn no_tls(mut self, no_tls: bool) -> Self {
194 self.no_tls = no_tls;
195 self
196 }
197
198 pub fn include_tvl(mut self, include_tvl: bool) -> Self {
202 self.include_tvl = include_tvl;
203 self
204 }
205
206 pub fn disable_compression(mut self) -> Self {
209 self.compression = false;
210 self
211 }
212
213 pub fn enable_partial_blocks(mut self) -> Self {
215 self.partial_blocks = true;
216 self
217 }
218
219 pub fn max_messages(mut self, n: usize) -> Self {
222 self.max_messages = Some(n);
223 self
224 }
225
226 pub fn max_retries(mut self, max_retries: u64) -> Self {
229 let cooldown = match &self.state_sync_retry_config {
230 RetryConfiguration::Constant(c) => c.cooldown,
231 };
232 self.state_sync_retry_config = RetryConfiguration::constant(max_retries, cooldown);
233 self
234 }
235
236 pub fn blocklisted_ids(mut self, ids: impl IntoIterator<Item = String>) -> Self {
241 self.blocklisted_ids.extend(ids);
242 self
243 }
244
245 pub async fn build(
248 self,
249 ) -> Result<
250 (JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
251 StreamError,
252 > {
253 if self.exchanges.is_empty() {
254 return Err(StreamError::SetUpError(
255 "At least one exchange must be registered.".to_string(),
256 ));
257 }
258
259 let auth_key = self
261 .auth_key
262 .clone()
263 .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
264
265 info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
266
267 let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
269 info!("Using non-secure connection: ws:// and http://");
270 let tycho_ws_url = format!("ws://{}", self.tycho_url);
271 let tycho_rpc_url = format!("http://{}", self.tycho_url);
272 (tycho_ws_url, tycho_rpc_url)
273 } else {
274 info!("Using secure connection: wss:// and https://");
275 let tycho_ws_url = format!("wss://{}", self.tycho_url);
276 let tycho_rpc_url = format!("https://{}", self.tycho_url);
277 (tycho_ws_url, tycho_rpc_url)
278 };
279
280 let ws_client = match &self.websockets_retry_config {
282 RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
283 &tycho_ws_url,
284 auth_key.as_deref(),
285 config.max_attempts,
286 config.cooldown,
287 ),
288 }
289 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
290 let rpc_client = HttpRPCClient::new(
291 &tycho_rpc_url,
292 HttpRPCClientOptions::new()
293 .with_auth_key(auth_key)
294 .with_compression(self.compression),
295 )
296 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
297 let ws_jh = ws_client
298 .connect()
299 .await
300 .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
301
302 let mut block_sync = BlockSynchronizer::new(
304 Duration::from_secs(self.block_time),
305 Duration::from_secs(self.timeout),
306 self.max_missed_blocks,
307 );
308 if let Some(n) = self.max_messages {
309 block_sync.max_messages(n);
310 }
311
312 let requested: HashSet<_> = self.exchanges.keys().cloned().collect();
313 let info = ProtocolSystemsInfo::fetch(&rpc_client, self.chain, &requested).await;
314 info.log_other_available();
315 let dci_protocols = info.dci_protocols;
316
317 for (name, filter) in self
319 .exchanges
320 .into_iter()
321 .map(|(name, filter)| {
322 let filter = if self.blocklisted_ids.is_empty() {
323 filter
324 } else {
325 filter.blocklist(self.blocklisted_ids.iter().cloned())
326 };
327 (name, filter)
328 })
329 {
330 info!("Registering exchange: {}", name);
331 let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
332 let uses_dci = dci_protocols.contains(&name);
333 let sync = match &self.state_sync_retry_config {
334 RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
335 id.clone(),
336 true,
337 filter,
338 retry_config.max_attempts,
339 retry_config.cooldown,
340 !self.no_state,
341 self.include_tvl,
342 self.compression,
343 rpc_client.clone(),
344 ws_client.clone(),
345 self.block_time + self.timeout,
346 )
347 .with_dci(uses_dci)
348 .with_partial_blocks(self.partial_blocks),
349 };
350 block_sync = block_sync.register_synchronizer(id, sync);
351 }
352
353 let (sync_jh, rx) = block_sync
355 .run()
356 .await
357 .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
358
359 let handle = tokio::spawn(async move {
361 tokio::select! {
362 res = ws_jh => {
363 let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
364 }
365 res = sync_jh => {
366 res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
367 }
368 }
369 if let Err(e) = ws_client.close().await {
370 warn!(?e, "Failed to close WebSocket client");
371 }
372 });
373
374 Ok((handle, rx))
375 }
376}
377
378pub struct ProtocolSystemsInfo {
381 pub dci_protocols: HashSet<String>,
382 pub other_available: HashSet<String>,
383}
384
385impl ProtocolSystemsInfo {
386 pub async fn fetch(
389 rpc_client: &HttpRPCClient,
390 chain: Chain,
391 requested_exchanges: &HashSet<String>,
392 ) -> Self {
393 let page_size =
394 ProtocolSystemsRequestBody::effective_max_page_size(rpc_client.compression());
395 let params = ProtocolSystemsParams::new(chain).with_pagination(0, page_size);
396 let response = rpc_client
397 .get_protocol_systems(params)
398 .await
399 .map_err(|e| {
400 warn!(
401 "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
402 );
403 e
404 })
405 .ok();
406
407 let Some(response) = response else {
408 return Self { dci_protocols: HashSet::new(), other_available: HashSet::new() };
409 };
410
411 if response.total() > page_size {
412 warn!(
413 "Server has {} protocol systems but only {} were fetched (page_size={page_size}). \
414 Availability info may be incomplete.",
415 response.total(),
416 response.data().protocol_systems().len(),
417 );
418 }
419
420 let available: HashSet<_> = response
421 .data()
422 .protocol_systems()
423 .iter()
424 .cloned()
425 .collect();
426 let other_available = available
427 .difference(requested_exchanges)
428 .cloned()
429 .collect();
430 let mut dci_protocols: HashSet<String> = response
431 .data()
432 .dci_protocols()
433 .iter()
434 .cloned()
435 .collect();
436
437 if dci_protocols.is_empty() {
442 const LEGACY_DCI: &[&str] = &[
443 "uniswap_v4_hooks",
444 "vm:curve",
445 "vm:balancer_v2",
446 "vm:balancer_v3",
447 "fluid_v1",
448 "erc4626",
449 ];
450 for name in requested_exchanges {
451 if LEGACY_DCI.contains(&name.as_str()) {
452 dci_protocols.insert(name.clone());
453 }
454 }
455 }
456
457 Self { dci_protocols, other_available }
458 }
459
460 pub fn log_other_available(&self) {
462 if !self.other_available.is_empty() {
463 let names: Vec<_> = self
464 .other_available
465 .iter()
466 .cloned()
467 .collect();
468 info!("Other available protocols: {}", names.join(", "));
469 }
470 }
471}
472
473#[cfg(test)]
474mod tests {
475 use super::*;
476
477 #[test]
478 fn test_retry_configuration_constant() {
479 let config = RetryConfiguration::constant(5, Duration::from_secs(10));
480 match config {
481 RetryConfiguration::Constant(c) => {
482 assert_eq!(c.max_attempts, 5);
483 assert_eq!(c.cooldown, Duration::from_secs(10));
484 }
485 }
486 }
487
488 #[test]
489 fn test_stream_builder_retry_configs() {
490 let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
491 let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
492 let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
493
494 builder = builder
495 .websockets_retry_config(&ws_config)
496 .state_synchronizer_retry_config(&state_config);
497
498 match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
500 (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
501 assert_eq!(ws.max_attempts, 10);
502 assert_eq!(ws.cooldown, Duration::from_secs(2));
503 assert_eq!(state.max_attempts, 20);
504 assert_eq!(state.cooldown, Duration::from_secs(5));
505 }
506 }
507 }
508
509 #[test]
510 fn test_default_stream_builder() {
511 let builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
512 assert!(builder.compression, "Compression should be enabled by default.");
513 assert!(!builder.partial_blocks, "partial_blocks should be disabled by default.");
514 }
515
516 #[tokio::test]
517 async fn test_no_exchanges() {
518 let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
519 .auth_key(Some("my_api_key".into()))
520 .build()
521 .await;
522 assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
523 }
524
525 #[ignore = "require tycho gateway"]
526 #[tokio::test]
527 async fn test_simple_build() {
528 let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
529 let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
530 .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
531 .auth_key(Some(token))
532 .build()
533 .await;
534
535 dbg!(&receiver);
536
537 assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
538 }
539}