1use std::{
2 cmp::max,
3 collections::{HashMap, HashSet},
4 env,
5 time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
12
13use crate::{
14 deltas::DeltasClient,
15 feed::{
16 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
17 BlockSynchronizer, BlockSynchronizerError, FeedMessage,
18 },
19 rpc::{HttpRPCClientOptions, RPCClient},
20 HttpRPCClient, WsDeltasClient,
21};
22
23#[derive(Error, Debug)]
24pub enum StreamError {
25 #[error("Error during stream set up: {0}")]
26 SetUpError(String),
27
28 #[error("WebSocket client connection error: {0}")]
29 WebSocketConnectionError(String),
30
31 #[error("BlockSynchronizer error: {0}")]
32 BlockSynchronizerError(String),
33}
34
35#[non_exhaustive]
36#[derive(Clone, Debug)]
37pub enum RetryConfiguration {
38 Constant(ConstantRetryConfiguration),
39}
40
41impl RetryConfiguration {
42 pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
43 RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
44 }
45}
46
47#[derive(Clone, Debug)]
48pub struct ConstantRetryConfiguration {
49 max_attempts: u64,
50 cooldown: Duration,
51}
52
53pub struct TychoStreamBuilder {
54 tycho_url: String,
55 chain: Chain,
56 exchanges: HashMap<String, ComponentFilter>,
57 block_time: u64,
58 timeout: u64,
59 startup_timeout: Duration,
60 max_missed_blocks: u64,
61 state_sync_retry_config: RetryConfiguration,
62 websockets_retry_config: RetryConfiguration,
63 no_state: bool,
64 auth_key: Option<String>,
65 no_tls: bool,
66 include_tvl: bool,
67 compression: bool,
68 partial_blocks: bool,
69}
70
71impl TychoStreamBuilder {
72 pub fn new(tycho_url: &str, chain: Chain) -> Self {
75 let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
76 Self {
77 tycho_url: tycho_url.to_string(),
78 chain,
79 exchanges: HashMap::new(),
80 block_time,
81 timeout,
82 startup_timeout: Duration::from_secs(block_time * max_missed_blocks),
83 max_missed_blocks,
84 state_sync_retry_config: RetryConfiguration::constant(
85 32,
86 Duration::from_secs(max(block_time / 4, 2)),
87 ),
88 websockets_retry_config: RetryConfiguration::constant(
89 128,
90 Duration::from_secs(max(block_time / 6, 1)),
91 ),
92 no_state: false,
93 auth_key: None,
94 no_tls: true,
95 include_tvl: false,
96 compression: true,
97 partial_blocks: false,
98 }
99 }
100
101 fn default_timing(chain: &Chain) -> (u64, u64, u64) {
104 match chain {
105 Chain::Ethereum => (12, 36, 50),
106 Chain::Starknet => (2, 8, 50),
107 Chain::ZkSync => (3, 12, 50),
108 Chain::Arbitrum => (1, 2, 100), Chain::Base => (2, 12, 50),
110 Chain::Bsc => (1, 12, 50),
111 Chain::Unichain => (1, 10, 100),
112 }
113 }
114
115 pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
117 self.exchanges
118 .insert(name.to_string(), filter);
119 self
120 }
121
122 pub fn block_time(mut self, block_time: u64) -> Self {
124 self.block_time = block_time;
125 self
126 }
127
128 pub fn timeout(mut self, timeout: u64) -> Self {
130 self.timeout = timeout;
131 self
132 }
133
134 pub fn startup_timeout(mut self, timeout: Duration) -> Self {
135 self.startup_timeout = timeout;
136 self
137 }
138
139 pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
140 self.max_missed_blocks = max_missed_blocks;
141 self
142 }
143
144 pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
145 self.websockets_retry_config = retry_config.clone();
146 self.warn_on_potential_timing_issues();
147 self
148 }
149
150 pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
151 self.state_sync_retry_config = retry_config.clone();
152 self.warn_on_potential_timing_issues();
153 self
154 }
155
156 fn warn_on_potential_timing_issues(&self) {
157 let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
158 (&self.state_sync_retry_config, &self.websockets_retry_config);
159
160 if ws_config.cooldown >= state_config.cooldown {
161 warn!(
162 "Websocket cooldown should be < than state syncronizer cooldown \
163 to avoid spending retries due to disconnected websocket."
164 )
165 }
166 }
167
168 pub fn no_state(mut self, no_state: bool) -> Self {
170 self.no_state = no_state;
171 self
172 }
173
174 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
179 self.auth_key = auth_key;
180 self.no_tls = false;
181 self
182 }
183
184 pub fn no_tls(mut self, no_tls: bool) -> Self {
186 self.no_tls = no_tls;
187 self
188 }
189
190 pub fn include_tvl(mut self, include_tvl: bool) -> Self {
194 self.include_tvl = include_tvl;
195 self
196 }
197
198 pub fn disable_compression(mut self) -> Self {
201 self.compression = false;
202 self
203 }
204
205 pub fn with_partial_blocks(mut self, val: bool) -> Self {
208 self.partial_blocks = val;
209 self
210 }
211
212 pub async fn build(
215 self,
216 ) -> Result<
217 (JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
218 StreamError,
219 > {
220 if self.exchanges.is_empty() {
221 return Err(StreamError::SetUpError(
222 "At least one exchange must be registered.".to_string(),
223 ));
224 }
225
226 let auth_key = self
228 .auth_key
229 .clone()
230 .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
231
232 info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
233
234 let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
236 info!("Using non-secure connection: ws:// and http://");
237 let tycho_ws_url = format!("ws://{}", self.tycho_url);
238 let tycho_rpc_url = format!("http://{}", self.tycho_url);
239 (tycho_ws_url, tycho_rpc_url)
240 } else {
241 info!("Using secure connection: wss:// and https://");
242 let tycho_ws_url = format!("wss://{}", self.tycho_url);
243 let tycho_rpc_url = format!("https://{}", self.tycho_url);
244 (tycho_ws_url, tycho_rpc_url)
245 };
246
247 let ws_client = match &self.websockets_retry_config {
249 RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
250 &tycho_ws_url,
251 auth_key.as_deref(),
252 config.max_attempts,
253 config.cooldown,
254 ),
255 }
256 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
257 let rpc_client = HttpRPCClient::new(
258 &tycho_rpc_url,
259 HttpRPCClientOptions::new()
260 .with_auth_key(auth_key)
261 .with_compression(self.compression),
262 )
263 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
264 let ws_jh = ws_client
265 .connect()
266 .await
267 .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
268
269 let mut block_sync = BlockSynchronizer::new(
271 Duration::from_secs(self.block_time),
272 Duration::from_secs(self.timeout),
273 self.max_missed_blocks,
274 );
275
276 self.display_available_protocols(&rpc_client)
277 .await;
278
279 for (name, filter) in self.exchanges {
281 info!("Registering exchange: {}", name);
282 let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
283 let sync = match &self.state_sync_retry_config {
284 RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
285 id.clone(),
286 true,
287 filter,
288 retry_config.max_attempts,
289 retry_config.cooldown,
290 !self.no_state,
291 self.include_tvl,
292 self.compression,
293 rpc_client.clone(),
294 ws_client.clone(),
295 self.block_time + self.timeout,
296 )
297 .with_partial_blocks(self.partial_blocks),
298 };
299 block_sync = block_sync.register_synchronizer(id, sync);
300 }
301
302 let (sync_jh, rx) = block_sync
304 .run()
305 .await
306 .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
307
308 let handle = tokio::spawn(async move {
310 tokio::select! {
311 res = ws_jh => {
312 let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
313 }
314 res = sync_jh => {
315 res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
316 }
317 }
318 if let Err(e) = ws_client.close().await {
319 warn!(?e, "Failed to close WebSocket client");
320 }
321 });
322
323 Ok((handle, rx))
324 }
325
326 async fn display_available_protocols(&self, rpc_client: &HttpRPCClient) {
329 let available_protocols_set = rpc_client
330 .get_protocol_systems(&ProtocolSystemsRequestBody {
331 chain: self.chain,
332 pagination: PaginationParams { page: 0, page_size: 100 },
333 })
334 .await
335 .map(|resp| {
336 resp.protocol_systems
337 .into_iter()
338 .collect::<HashSet<_>>()
339 })
340 .map_err(|e| {
341 warn!(
342 "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
343 );
344 e
345 })
346 .ok();
347
348 if let Some(not_requested_protocols) = available_protocols_set
349 .map(|available_protocols_set| {
350 let requested_protocol_set = self
351 .exchanges
352 .keys()
353 .cloned()
354 .collect::<HashSet<_>>();
355
356 available_protocols_set
357 .difference(&requested_protocol_set)
358 .cloned()
359 .collect::<Vec<_>>()
360 })
361 .filter(|not_requested_protocols| !not_requested_protocols.is_empty())
362 {
363 info!("Other available protocols: {}", not_requested_protocols.join(", "))
364 }
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371
372 #[test]
373 fn test_retry_configuration_constant() {
374 let config = RetryConfiguration::constant(5, Duration::from_secs(10));
375 match config {
376 RetryConfiguration::Constant(c) => {
377 assert_eq!(c.max_attempts, 5);
378 assert_eq!(c.cooldown, Duration::from_secs(10));
379 }
380 }
381 }
382
383 #[test]
384 fn test_stream_builder_retry_configs() {
385 let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
386 let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
387 let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
388
389 builder = builder
390 .websockets_retry_config(&ws_config)
391 .state_synchronizer_retry_config(&state_config);
392
393 match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
395 (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
396 assert_eq!(ws.max_attempts, 10);
397 assert_eq!(ws.cooldown, Duration::from_secs(2));
398 assert_eq!(state.max_attempts, 20);
399 assert_eq!(state.cooldown, Duration::from_secs(5));
400 }
401 }
402 }
403
404 #[test]
405 fn test_default_stream_builder() {
406 let builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
407 assert!(builder.compression, "Compression should be enabled by default.");
408 assert!(!builder.partial_blocks, "partial_blocks should be disabled by default.");
409 }
410
411 #[tokio::test]
412 async fn test_no_exchanges() {
413 let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
414 .auth_key(Some("my_api_key".into()))
415 .build()
416 .await;
417 assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
418 }
419
420 #[ignore = "require tycho gateway"]
421 #[tokio::test]
422 async fn test_simple_build() {
423 let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
424 let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
425 .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
426 .auth_key(Some(token))
427 .build()
428 .await;
429
430 dbg!(&receiver);
431
432 assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
433 }
434}