1use std::{
2 cmp::max,
3 collections::{HashMap, HashSet},
4 env,
5 time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::dto::{
12 Chain, ExtractorIdentity, PaginationLimits, PaginationParams, ProtocolSystemsRequestBody,
13};
14
15use crate::{
16 deltas::DeltasClient,
17 feed::{
18 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
19 BlockSynchronizer, BlockSynchronizerError, FeedMessage,
20 },
21 rpc::{HttpRPCClientOptions, RPCClient},
22 HttpRPCClient, WsDeltasClient,
23};
24
25#[derive(Error, Debug)]
26pub enum StreamError {
27 #[error("Error during stream set up: {0}")]
28 SetUpError(String),
29
30 #[error("WebSocket client connection error: {0}")]
31 WebSocketConnectionError(String),
32
33 #[error("BlockSynchronizer error: {0}")]
34 BlockSynchronizerError(String),
35}
36
37#[non_exhaustive]
38#[derive(Clone, Debug)]
39pub enum RetryConfiguration {
40 Constant(ConstantRetryConfiguration),
41}
42
43impl RetryConfiguration {
44 pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
45 RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
46 }
47}
48
49#[derive(Clone, Debug)]
50pub struct ConstantRetryConfiguration {
51 max_attempts: u64,
52 cooldown: Duration,
53}
54
55pub struct TychoStreamBuilder {
56 tycho_url: String,
57 chain: Chain,
58 exchanges: HashMap<String, ComponentFilter>,
59 block_time: u64,
60 timeout: u64,
61 startup_timeout: Duration,
62 max_missed_blocks: u64,
63 state_sync_retry_config: RetryConfiguration,
64 websockets_retry_config: RetryConfiguration,
65 no_state: bool,
66 auth_key: Option<String>,
67 no_tls: bool,
68 include_tvl: bool,
69 compression: bool,
70 partial_blocks: bool,
71}
72
73impl TychoStreamBuilder {
74 pub fn new(tycho_url: &str, chain: Chain) -> Self {
77 let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
78 Self {
79 tycho_url: tycho_url.to_string(),
80 chain,
81 exchanges: HashMap::new(),
82 block_time,
83 timeout,
84 startup_timeout: Duration::from_secs(block_time * max_missed_blocks),
85 max_missed_blocks,
86 state_sync_retry_config: RetryConfiguration::constant(
87 32,
88 Duration::from_secs(max(block_time / 4, 2)),
89 ),
90 websockets_retry_config: RetryConfiguration::constant(
91 128,
92 Duration::from_secs(max(block_time / 6, 1)),
93 ),
94 no_state: false,
95 auth_key: None,
96 no_tls: true,
97 include_tvl: false,
98 compression: true,
99 partial_blocks: false,
100 }
101 }
102
103 fn default_timing(chain: &Chain) -> (u64, u64, u64) {
106 match chain {
107 Chain::Ethereum => (12, 36, 50),
108 Chain::Starknet => (2, 8, 50),
109 Chain::ZkSync => (3, 12, 50),
110 Chain::Arbitrum => (1, 2, 100), Chain::Base => (2, 12, 50),
112 Chain::Bsc => (1, 12, 50),
113 Chain::Unichain => (1, 10, 100),
114 }
115 }
116
117 pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
119 self.exchanges
120 .insert(name.to_string(), filter);
121 self
122 }
123
124 pub fn block_time(mut self, block_time: u64) -> Self {
126 self.block_time = block_time;
127 self
128 }
129
130 pub fn timeout(mut self, timeout: u64) -> Self {
132 self.timeout = timeout;
133 self
134 }
135
136 pub fn startup_timeout(mut self, timeout: Duration) -> Self {
137 self.startup_timeout = timeout;
138 self
139 }
140
141 pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
142 self.max_missed_blocks = max_missed_blocks;
143 self
144 }
145
146 pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
147 self.websockets_retry_config = retry_config.clone();
148 self.warn_on_potential_timing_issues();
149 self
150 }
151
152 pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
153 self.state_sync_retry_config = retry_config.clone();
154 self.warn_on_potential_timing_issues();
155 self
156 }
157
158 fn warn_on_potential_timing_issues(&self) {
159 let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
160 (&self.state_sync_retry_config, &self.websockets_retry_config);
161
162 if ws_config.cooldown >= state_config.cooldown {
163 warn!(
164 "Websocket cooldown should be < than state syncronizer cooldown \
165 to avoid spending retries due to disconnected websocket."
166 )
167 }
168 }
169
170 pub fn no_state(mut self, no_state: bool) -> Self {
172 self.no_state = no_state;
173 self
174 }
175
176 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
181 self.auth_key = auth_key;
182 self.no_tls = false;
183 self
184 }
185
186 pub fn no_tls(mut self, no_tls: bool) -> Self {
188 self.no_tls = no_tls;
189 self
190 }
191
192 pub fn include_tvl(mut self, include_tvl: bool) -> Self {
196 self.include_tvl = include_tvl;
197 self
198 }
199
200 pub fn disable_compression(mut self) -> Self {
203 self.compression = false;
204 self
205 }
206
207 pub fn enable_partial_blocks(mut self) -> Self {
209 self.partial_blocks = true;
210 self
211 }
212
213 pub async fn build(
216 self,
217 ) -> Result<
218 (JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
219 StreamError,
220 > {
221 if self.exchanges.is_empty() {
222 return Err(StreamError::SetUpError(
223 "At least one exchange must be registered.".to_string(),
224 ));
225 }
226
227 let auth_key = self
229 .auth_key
230 .clone()
231 .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
232
233 info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
234
235 let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
237 info!("Using non-secure connection: ws:// and http://");
238 let tycho_ws_url = format!("ws://{}", self.tycho_url);
239 let tycho_rpc_url = format!("http://{}", self.tycho_url);
240 (tycho_ws_url, tycho_rpc_url)
241 } else {
242 info!("Using secure connection: wss:// and https://");
243 let tycho_ws_url = format!("wss://{}", self.tycho_url);
244 let tycho_rpc_url = format!("https://{}", self.tycho_url);
245 (tycho_ws_url, tycho_rpc_url)
246 };
247
248 let ws_client = match &self.websockets_retry_config {
250 RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
251 &tycho_ws_url,
252 auth_key.as_deref(),
253 config.max_attempts,
254 config.cooldown,
255 ),
256 }
257 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
258 let rpc_client = HttpRPCClient::new(
259 &tycho_rpc_url,
260 HttpRPCClientOptions::new()
261 .with_auth_key(auth_key)
262 .with_compression(self.compression),
263 )
264 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
265 let ws_jh = ws_client
266 .connect()
267 .await
268 .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
269
270 let mut block_sync = BlockSynchronizer::new(
272 Duration::from_secs(self.block_time),
273 Duration::from_secs(self.timeout),
274 self.max_missed_blocks,
275 );
276
277 let requested: HashSet<_> = self.exchanges.keys().cloned().collect();
278 let info = ProtocolSystemsInfo::fetch(&rpc_client, self.chain, &requested).await;
279 info.log_other_available();
280 let dci_protocols = info.dci_protocols;
281
282 for (name, filter) in self.exchanges {
284 info!("Registering exchange: {}", name);
285 let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
286 let uses_dci = dci_protocols.contains(&name);
287 let sync = match &self.state_sync_retry_config {
288 RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
289 id.clone(),
290 true,
291 filter,
292 retry_config.max_attempts,
293 retry_config.cooldown,
294 !self.no_state,
295 self.include_tvl,
296 self.compression,
297 rpc_client.clone(),
298 ws_client.clone(),
299 self.block_time + self.timeout,
300 )
301 .with_dci(uses_dci)
302 .with_partial_blocks(self.partial_blocks),
303 };
304 block_sync = block_sync.register_synchronizer(id, sync);
305 }
306
307 let (sync_jh, rx) = block_sync
309 .run()
310 .await
311 .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
312
313 let handle = tokio::spawn(async move {
315 tokio::select! {
316 res = ws_jh => {
317 let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
318 }
319 res = sync_jh => {
320 res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
321 }
322 }
323 if let Err(e) = ws_client.close().await {
324 warn!(?e, "Failed to close WebSocket client");
325 }
326 });
327
328 Ok((handle, rx))
329 }
330}
331
332pub struct ProtocolSystemsInfo {
335 pub dci_protocols: HashSet<String>,
336 pub other_available: HashSet<String>,
337}
338
339impl ProtocolSystemsInfo {
340 pub async fn fetch(
343 rpc_client: &HttpRPCClient,
344 chain: Chain,
345 requested_exchanges: &HashSet<String>,
346 ) -> Self {
347 let page_size =
348 ProtocolSystemsRequestBody::effective_max_page_size(rpc_client.compression());
349 let response = rpc_client
350 .get_protocol_systems(&ProtocolSystemsRequestBody {
351 chain,
352 pagination: PaginationParams { page: 0, page_size },
353 })
354 .await
355 .map_err(|e| {
356 warn!(
357 "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
358 );
359 e
360 })
361 .ok();
362
363 let Some(response) = response else {
364 return Self { dci_protocols: HashSet::new(), other_available: HashSet::new() };
365 };
366
367 if response.pagination.total > page_size {
368 warn!(
369 "Server has {} protocol systems but only {} were fetched (page_size={page_size}). \
370 Availability info may be incomplete.",
371 response.pagination.total,
372 response.protocol_systems.len(),
373 );
374 }
375
376 let available: HashSet<_> = response
377 .protocol_systems
378 .into_iter()
379 .collect();
380 let other_available = available
381 .difference(requested_exchanges)
382 .cloned()
383 .collect();
384 let mut dci_protocols: HashSet<String> = response
385 .dci_protocols
386 .into_iter()
387 .collect();
388
389 if dci_protocols.is_empty() {
394 const LEGACY_DCI: &[&str] = &[
395 "uniswap_v4_hooks",
396 "vm:curve",
397 "vm:balancer_v2",
398 "vm:balancer_v3",
399 "fluid_v1",
400 "erc4626",
401 ];
402 for name in requested_exchanges {
403 if LEGACY_DCI.contains(&name.as_str()) {
404 dci_protocols.insert(name.clone());
405 }
406 }
407 }
408
409 Self { dci_protocols, other_available }
410 }
411
412 pub fn log_other_available(&self) {
414 if !self.other_available.is_empty() {
415 let names: Vec<_> = self
416 .other_available
417 .iter()
418 .cloned()
419 .collect();
420 info!("Other available protocols: {}", names.join(", "));
421 }
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428
429 #[test]
430 fn test_retry_configuration_constant() {
431 let config = RetryConfiguration::constant(5, Duration::from_secs(10));
432 match config {
433 RetryConfiguration::Constant(c) => {
434 assert_eq!(c.max_attempts, 5);
435 assert_eq!(c.cooldown, Duration::from_secs(10));
436 }
437 }
438 }
439
440 #[test]
441 fn test_stream_builder_retry_configs() {
442 let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
443 let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
444 let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
445
446 builder = builder
447 .websockets_retry_config(&ws_config)
448 .state_synchronizer_retry_config(&state_config);
449
450 match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
452 (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
453 assert_eq!(ws.max_attempts, 10);
454 assert_eq!(ws.cooldown, Duration::from_secs(2));
455 assert_eq!(state.max_attempts, 20);
456 assert_eq!(state.cooldown, Duration::from_secs(5));
457 }
458 }
459 }
460
461 #[test]
462 fn test_default_stream_builder() {
463 let builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
464 assert!(builder.compression, "Compression should be enabled by default.");
465 assert!(!builder.partial_blocks, "partial_blocks should be disabled by default.");
466 }
467
468 #[tokio::test]
469 async fn test_no_exchanges() {
470 let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
471 .auth_key(Some("my_api_key".into()))
472 .build()
473 .await;
474 assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
475 }
476
477 #[ignore = "require tycho gateway"]
478 #[tokio::test]
479 async fn test_simple_build() {
480 let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
481 let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
482 .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
483 .auth_key(Some(token))
484 .build()
485 .await;
486
487 dbg!(&receiver);
488
489 assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
490 }
491}