1use std::{
2 cmp::max,
3 collections::{HashMap, HashSet},
4 env,
5 time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
12
13use crate::{
14 deltas::DeltasClient,
15 feed::{
16 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
17 BlockSynchronizer, BlockSynchronizerError, FeedMessage,
18 },
19 rpc::{HttpRPCClientOptions, RPCClient},
20 HttpRPCClient, WsDeltasClient,
21};
22
23#[derive(Error, Debug)]
24pub enum StreamError {
25 #[error("Error during stream set up: {0}")]
26 SetUpError(String),
27
28 #[error("WebSocket client connection error: {0}")]
29 WebSocketConnectionError(String),
30
31 #[error("BlockSynchronizer error: {0}")]
32 BlockSynchronizerError(String),
33}
34
35#[non_exhaustive]
36#[derive(Clone, Debug)]
37pub enum RetryConfiguration {
38 Constant(ConstantRetryConfiguration),
39}
40
41impl RetryConfiguration {
42 pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
43 RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
44 }
45}
46
47#[derive(Clone, Debug)]
48pub struct ConstantRetryConfiguration {
49 max_attempts: u64,
50 cooldown: Duration,
51}
52
53pub struct TychoStreamBuilder {
54 tycho_url: String,
55 chain: Chain,
56 exchanges: HashMap<String, ComponentFilter>,
57 block_time: u64,
58 timeout: u64,
59 startup_timeout: Duration,
60 max_missed_blocks: u64,
61 state_sync_retry_config: RetryConfiguration,
62 websockets_retry_config: RetryConfiguration,
63 no_state: bool,
64 auth_key: Option<String>,
65 no_tls: bool,
66 include_tvl: bool,
67 compression: bool,
68 partial_blocks: bool,
69}
70
71impl TychoStreamBuilder {
72 pub fn new(tycho_url: &str, chain: Chain) -> Self {
75 let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
76 Self {
77 tycho_url: tycho_url.to_string(),
78 chain,
79 exchanges: HashMap::new(),
80 block_time,
81 timeout,
82 startup_timeout: Duration::from_secs(block_time * max_missed_blocks),
83 max_missed_blocks,
84 state_sync_retry_config: RetryConfiguration::constant(
85 32,
86 Duration::from_secs(max(block_time / 4, 2)),
87 ),
88 websockets_retry_config: RetryConfiguration::constant(
89 128,
90 Duration::from_secs(max(block_time / 6, 1)),
91 ),
92 no_state: false,
93 auth_key: None,
94 no_tls: true,
95 include_tvl: false,
96 compression: true,
97 partial_blocks: false,
98 }
99 }
100
101 fn default_timing(chain: &Chain) -> (u64, u64, u64) {
104 match chain {
105 Chain::Ethereum => (12, 36, 50),
106 Chain::Starknet => (2, 8, 50),
107 Chain::ZkSync => (3, 12, 50),
108 Chain::Arbitrum => (1, 2, 100), Chain::Base => (2, 12, 50),
110 Chain::Bsc => (1, 12, 50),
111 Chain::Unichain => (1, 10, 100),
112 }
113 }
114
115 pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
117 self.exchanges
118 .insert(name.to_string(), filter);
119 self
120 }
121
122 pub fn block_time(mut self, block_time: u64) -> Self {
124 self.block_time = block_time;
125 self
126 }
127
128 pub fn timeout(mut self, timeout: u64) -> Self {
130 self.timeout = timeout;
131 self
132 }
133
134 pub fn startup_timeout(mut self, timeout: Duration) -> Self {
135 self.startup_timeout = timeout;
136 self
137 }
138
139 pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
140 self.max_missed_blocks = max_missed_blocks;
141 self
142 }
143
144 pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
145 self.websockets_retry_config = retry_config.clone();
146 self.warn_on_potential_timing_issues();
147 self
148 }
149
150 pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
151 self.state_sync_retry_config = retry_config.clone();
152 self.warn_on_potential_timing_issues();
153 self
154 }
155
156 fn warn_on_potential_timing_issues(&self) {
157 let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
158 (&self.state_sync_retry_config, &self.websockets_retry_config);
159
160 if ws_config.cooldown >= state_config.cooldown {
161 warn!(
162 "Websocket cooldown should be < than state syncronizer cooldown \
163 to avoid spending retries due to disconnected websocket."
164 )
165 }
166 }
167
168 pub fn no_state(mut self, no_state: bool) -> Self {
170 self.no_state = no_state;
171 self
172 }
173
174 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
179 self.auth_key = auth_key;
180 self.no_tls = false;
181 self
182 }
183
184 pub fn no_tls(mut self, no_tls: bool) -> Self {
186 self.no_tls = no_tls;
187 self
188 }
189
190 pub fn include_tvl(mut self, include_tvl: bool) -> Self {
194 self.include_tvl = include_tvl;
195 self
196 }
197
198 pub fn disable_compression(mut self) -> Self {
201 self.compression = false;
202 self
203 }
204
205 pub fn enable_partial_blocks(mut self) -> Self {
207 self.partial_blocks = true;
208 self
209 }
210
211 pub async fn build(
214 self,
215 ) -> Result<
216 (JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
217 StreamError,
218 > {
219 if self.exchanges.is_empty() {
220 return Err(StreamError::SetUpError(
221 "At least one exchange must be registered.".to_string(),
222 ));
223 }
224
225 let auth_key = self
227 .auth_key
228 .clone()
229 .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
230
231 info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
232
233 let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
235 info!("Using non-secure connection: ws:// and http://");
236 let tycho_ws_url = format!("ws://{}", self.tycho_url);
237 let tycho_rpc_url = format!("http://{}", self.tycho_url);
238 (tycho_ws_url, tycho_rpc_url)
239 } else {
240 info!("Using secure connection: wss:// and https://");
241 let tycho_ws_url = format!("wss://{}", self.tycho_url);
242 let tycho_rpc_url = format!("https://{}", self.tycho_url);
243 (tycho_ws_url, tycho_rpc_url)
244 };
245
246 let ws_client = match &self.websockets_retry_config {
248 RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
249 &tycho_ws_url,
250 auth_key.as_deref(),
251 config.max_attempts,
252 config.cooldown,
253 ),
254 }
255 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
256 let rpc_client = HttpRPCClient::new(
257 &tycho_rpc_url,
258 HttpRPCClientOptions::new()
259 .with_auth_key(auth_key)
260 .with_compression(self.compression),
261 )
262 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
263 let ws_jh = ws_client
264 .connect()
265 .await
266 .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
267
268 let mut block_sync = BlockSynchronizer::new(
270 Duration::from_secs(self.block_time),
271 Duration::from_secs(self.timeout),
272 self.max_missed_blocks,
273 );
274
275 self.display_available_protocols(&rpc_client)
276 .await;
277
278 for (name, filter) in self.exchanges {
280 info!("Registering exchange: {}", name);
281 let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
282 let sync = match &self.state_sync_retry_config {
283 RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
284 id.clone(),
285 true,
286 filter,
287 retry_config.max_attempts,
288 retry_config.cooldown,
289 !self.no_state,
290 self.include_tvl,
291 self.compression,
292 rpc_client.clone(),
293 ws_client.clone(),
294 self.block_time + self.timeout,
295 )
296 .with_partial_blocks(self.partial_blocks),
297 };
298 block_sync = block_sync.register_synchronizer(id, sync);
299 }
300
301 let (sync_jh, rx) = block_sync
303 .run()
304 .await
305 .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
306
307 let handle = tokio::spawn(async move {
309 tokio::select! {
310 res = ws_jh => {
311 let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
312 }
313 res = sync_jh => {
314 res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
315 }
316 }
317 if let Err(e) = ws_client.close().await {
318 warn!(?e, "Failed to close WebSocket client");
319 }
320 });
321
322 Ok((handle, rx))
323 }
324
325 async fn display_available_protocols(&self, rpc_client: &HttpRPCClient) {
328 let available_protocols_set = rpc_client
329 .get_protocol_systems(&ProtocolSystemsRequestBody {
330 chain: self.chain,
331 pagination: PaginationParams { page: 0, page_size: 100 },
332 })
333 .await
334 .map(|resp| {
335 resp.protocol_systems
336 .into_iter()
337 .collect::<HashSet<_>>()
338 })
339 .map_err(|e| {
340 warn!(
341 "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
342 );
343 e
344 })
345 .ok();
346
347 if let Some(not_requested_protocols) = available_protocols_set
348 .map(|available_protocols_set| {
349 let requested_protocol_set = self
350 .exchanges
351 .keys()
352 .cloned()
353 .collect::<HashSet<_>>();
354
355 available_protocols_set
356 .difference(&requested_protocol_set)
357 .cloned()
358 .collect::<Vec<_>>()
359 })
360 .filter(|not_requested_protocols| !not_requested_protocols.is_empty())
361 {
362 info!("Other available protocols: {}", not_requested_protocols.join(", "))
363 }
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 #[test]
372 fn test_retry_configuration_constant() {
373 let config = RetryConfiguration::constant(5, Duration::from_secs(10));
374 match config {
375 RetryConfiguration::Constant(c) => {
376 assert_eq!(c.max_attempts, 5);
377 assert_eq!(c.cooldown, Duration::from_secs(10));
378 }
379 }
380 }
381
382 #[test]
383 fn test_stream_builder_retry_configs() {
384 let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
385 let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
386 let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
387
388 builder = builder
389 .websockets_retry_config(&ws_config)
390 .state_synchronizer_retry_config(&state_config);
391
392 match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
394 (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
395 assert_eq!(ws.max_attempts, 10);
396 assert_eq!(ws.cooldown, Duration::from_secs(2));
397 assert_eq!(state.max_attempts, 20);
398 assert_eq!(state.cooldown, Duration::from_secs(5));
399 }
400 }
401 }
402
403 #[test]
404 fn test_default_stream_builder() {
405 let builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
406 assert!(builder.compression, "Compression should be enabled by default.");
407 assert!(!builder.partial_blocks, "partial_blocks should be disabled by default.");
408 }
409
410 #[tokio::test]
411 async fn test_no_exchanges() {
412 let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
413 .auth_key(Some("my_api_key".into()))
414 .build()
415 .await;
416 assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
417 }
418
419 #[ignore = "require tycho gateway"]
420 #[tokio::test]
421 async fn test_simple_build() {
422 let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
423 let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
424 .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
425 .auth_key(Some(token))
426 .build()
427 .await;
428
429 dbg!(&receiver);
430
431 assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
432 }
433}