1use std::{
2 cmp::max,
3 collections::{HashMap, HashSet},
4 env,
5 time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
12
13use crate::{
14 deltas::DeltasClient,
15 feed::{
16 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
17 BlockSynchronizer, BlockSynchronizerError, FeedMessage,
18 },
19 rpc::RPCClient,
20 HttpRPCClient, WsDeltasClient,
21};
22
23#[derive(Error, Debug)]
24pub enum StreamError {
25 #[error("Error during stream set up: {0}")]
26 SetUpError(String),
27
28 #[error("WebSocket client connection error: {0}")]
29 WebSocketConnectionError(String),
30
31 #[error("BlockSynchronizer error: {0}")]
32 BlockSynchronizerError(String),
33}
34
35#[non_exhaustive]
36#[derive(Clone, Debug)]
37pub enum RetryConfiguration {
38 Constant(ConstantRetryConfiguration),
39}
40
41impl RetryConfiguration {
42 pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
43 RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
44 }
45}
46
47#[derive(Clone, Debug)]
48pub struct ConstantRetryConfiguration {
49 max_attempts: u64,
50 cooldown: Duration,
51}
52
53pub struct TychoStreamBuilder {
54 tycho_url: String,
55 chain: Chain,
56 exchanges: HashMap<String, ComponentFilter>,
57 block_time: u64,
58 timeout: u64,
59 startup_timeout: Duration,
60 max_missed_blocks: u64,
61 state_sync_retry_config: RetryConfiguration,
62 websockets_retry_config: RetryConfiguration,
63 no_state: bool,
64 auth_key: Option<String>,
65 no_tls: bool,
66 include_tvl: bool,
67}
68
69impl TychoStreamBuilder {
70 pub fn new(tycho_url: &str, chain: Chain) -> Self {
73 let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
74 Self {
75 tycho_url: tycho_url.to_string(),
76 chain,
77 exchanges: HashMap::new(),
78 block_time,
79 timeout,
80 startup_timeout: Duration::from_secs(block_time * max_missed_blocks),
81 max_missed_blocks,
82 state_sync_retry_config: RetryConfiguration::constant(
83 32,
84 Duration::from_secs(max(block_time / 4, 2)),
85 ),
86 websockets_retry_config: RetryConfiguration::constant(
87 128,
88 Duration::from_secs(max(block_time / 6, 1)),
89 ),
90 no_state: false,
91 auth_key: None,
92 no_tls: true,
93 include_tvl: false,
94 }
95 }
96
97 fn default_timing(chain: &Chain) -> (u64, u64, u64) {
100 match chain {
101 Chain::Ethereum => (12, 36, 50),
102 Chain::Starknet => (2, 8, 50),
103 Chain::ZkSync => (3, 12, 50),
104 Chain::Arbitrum => (1, 2, 100), Chain::Base => (2, 12, 50),
106 Chain::Bsc => (1, 12, 50),
107 Chain::Unichain => (1, 10, 100),
108 }
109 }
110
111 pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
113 self.exchanges
114 .insert(name.to_string(), filter);
115 self
116 }
117
118 pub fn block_time(mut self, block_time: u64) -> Self {
120 self.block_time = block_time;
121 self
122 }
123
124 pub fn timeout(mut self, timeout: u64) -> Self {
126 self.timeout = timeout;
127 self
128 }
129
130 pub fn startup_timeout(mut self, timeout: Duration) -> Self {
131 self.startup_timeout = timeout;
132 self
133 }
134
135 pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
136 self.max_missed_blocks = max_missed_blocks;
137 self
138 }
139
140 pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
141 self.websockets_retry_config = retry_config.clone();
142 self.warn_on_potential_timing_issues();
143 self
144 }
145
146 pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
147 self.state_sync_retry_config = retry_config.clone();
148 self.warn_on_potential_timing_issues();
149 self
150 }
151
152 fn warn_on_potential_timing_issues(&self) {
153 let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
154 (&self.state_sync_retry_config, &self.websockets_retry_config);
155
156 if ws_config.cooldown >= state_config.cooldown {
157 warn!(
158 "Websocket cooldown should be < than state syncronizer cooldown \
159 to avoid spending retries due to disconnected websocket."
160 )
161 }
162 }
163
164 pub fn no_state(mut self, no_state: bool) -> Self {
166 self.no_state = no_state;
167 self
168 }
169
170 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
175 self.auth_key = auth_key;
176 self.no_tls = false;
177 self
178 }
179
180 pub fn no_tls(mut self, no_tls: bool) -> Self {
182 self.no_tls = no_tls;
183 self
184 }
185
186 pub fn include_tvl(mut self, include_tvl: bool) -> Self {
190 self.include_tvl = include_tvl;
191 self
192 }
193
194 pub async fn build(
197 self,
198 ) -> Result<
199 (JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
200 StreamError,
201 > {
202 if self.exchanges.is_empty() {
203 return Err(StreamError::SetUpError(
204 "At least one exchange must be registered.".to_string(),
205 ));
206 }
207
208 let auth_key = self
210 .auth_key
211 .clone()
212 .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
213
214 info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
215
216 let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
218 info!("Using non-secure connection: ws:// and http://");
219 let tycho_ws_url = format!("ws://{}", self.tycho_url);
220 let tycho_rpc_url = format!("http://{}", self.tycho_url);
221 (tycho_ws_url, tycho_rpc_url)
222 } else {
223 info!("Using secure connection: wss:// and https://");
224 let tycho_ws_url = format!("wss://{}", self.tycho_url);
225 let tycho_rpc_url = format!("https://{}", self.tycho_url);
226 (tycho_ws_url, tycho_rpc_url)
227 };
228
229 #[allow(unreachable_patterns)]
231 let ws_client = match &self.websockets_retry_config {
232 RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
233 &tycho_ws_url,
234 auth_key.as_deref(),
235 config.max_attempts,
236 config.cooldown,
237 ),
238 _ => {
239 return Err(StreamError::SetUpError(
240 "Unknown websocket configuration variant!".to_string(),
241 ));
242 }
243 }
244 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
245 let rpc_client = HttpRPCClient::new(&tycho_rpc_url, auth_key.as_deref())
246 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
247 let ws_jh = ws_client
248 .connect()
249 .await
250 .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
251
252 let mut block_sync = BlockSynchronizer::new(
254 Duration::from_secs(self.block_time),
255 Duration::from_secs(self.timeout),
256 self.max_missed_blocks,
257 );
258
259 self.display_available_protocols(&rpc_client)
260 .await;
261
262 for (name, filter) in self.exchanges {
264 info!("Registering exchange: {}", name);
265 let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
266 #[allow(unreachable_patterns)]
267 let sync = match &self.state_sync_retry_config {
268 RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
269 id.clone(),
270 true,
271 filter,
272 retry_config.max_attempts,
273 retry_config.cooldown,
274 !self.no_state,
275 self.include_tvl,
276 rpc_client.clone(),
277 ws_client.clone(),
278 self.block_time + self.timeout,
279 ),
280 _ => {
281 return Err(StreamError::SetUpError(
282 "Unknown state synchronizer configuration variant!".to_string(),
283 ));
284 }
285 };
286 block_sync = block_sync.register_synchronizer(id, sync);
287 }
288
289 let (sync_jh, rx) = block_sync
291 .run()
292 .await
293 .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
294
295 let handle = tokio::spawn(async move {
297 tokio::select! {
298 res = ws_jh => {
299 let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
300 }
301 res = sync_jh => {
302 res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
303 }
304 }
305 if let Err(e) = ws_client.close().await {
306 warn!(?e, "Failed to close WebSocket client");
307 }
308 });
309
310 Ok((handle, rx))
311 }
312
313 async fn display_available_protocols(&self, rpc_client: &HttpRPCClient) {
316 let available_protocols_set = rpc_client
317 .get_protocol_systems(&ProtocolSystemsRequestBody {
318 chain: self.chain,
319 pagination: PaginationParams { page: 0, page_size: 100 },
320 })
321 .await
322 .map(|resp| {
323 resp.protocol_systems
324 .into_iter()
325 .collect::<HashSet<_>>()
326 })
327 .map_err(|e| {
328 warn!(
329 "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
330 );
331 e
332 })
333 .ok();
334
335 if let Some(not_requested_protocols) = available_protocols_set
336 .map(|available_protocols_set| {
337 let requested_protocol_set = self
338 .exchanges
339 .keys()
340 .cloned()
341 .collect::<HashSet<_>>();
342
343 available_protocols_set
344 .difference(&requested_protocol_set)
345 .cloned()
346 .collect::<Vec<_>>()
347 })
348 .filter(|not_requested_protocols| !not_requested_protocols.is_empty())
349 {
350 info!("Other available protocols: {}", not_requested_protocols.join(", "))
351 }
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 #[test]
360 fn test_retry_configuration_constant() {
361 let config = RetryConfiguration::constant(5, Duration::from_secs(10));
362 match config {
363 RetryConfiguration::Constant(c) => {
364 assert_eq!(c.max_attempts, 5);
365 assert_eq!(c.cooldown, Duration::from_secs(10));
366 }
367 }
368 }
369
370 #[test]
371 fn test_stream_builder_retry_configs() {
372 let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
373 let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
374 let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
375
376 builder = builder
377 .websockets_retry_config(&ws_config)
378 .state_synchronizer_retry_config(&state_config);
379
380 match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
382 (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
383 assert_eq!(ws.max_attempts, 10);
384 assert_eq!(ws.cooldown, Duration::from_secs(2));
385 assert_eq!(state.max_attempts, 20);
386 assert_eq!(state.cooldown, Duration::from_secs(5));
387 }
388 }
389 }
390
391 #[tokio::test]
392 async fn test_no_exchanges() {
393 let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
394 .auth_key(Some("my_api_key".into()))
395 .build()
396 .await;
397 assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
398 }
399
400 #[ignore = "require tycho gateway"]
401 #[tokio::test]
402 async fn teat_simple_build() {
403 let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
404 let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
405 .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
406 .auth_key(Some(token))
407 .build()
408 .await;
409
410 dbg!(&receiver);
411
412 assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
413 }
414}