1use std::{
2 cmp::max,
3 collections::{HashMap, HashSet},
4 env,
5 time::Duration,
6};
7
8use thiserror::Error;
9use tokio::{sync::mpsc::Receiver, task::JoinHandle};
10use tracing::{info, warn};
11use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
12
13use crate::{
14 deltas::DeltasClient,
15 feed::{
16 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
17 BlockSynchronizer, FeedMessage,
18 },
19 rpc::RPCClient,
20 HttpRPCClient, WsDeltasClient,
21};
22
23#[derive(Error, Debug)]
24pub enum StreamError {
25 #[error("Error during stream set up: {0}")]
26 SetUpError(String),
27
28 #[error("WebSocket client connection error: {0}")]
29 WebSocketConnectionError(String),
30
31 #[error("BlockSynchronizer error: {0}")]
32 BlockSynchronizerError(String),
33}
34
35#[non_exhaustive]
36#[derive(Clone, Debug)]
37pub enum RetryConfiguration {
38 Constant(ConstantRetryConfiguration),
39}
40
41impl RetryConfiguration {
42 pub fn constant(max_attempts: u64, cooldown: Duration) -> Self {
43 RetryConfiguration::Constant(ConstantRetryConfiguration { max_attempts, cooldown })
44 }
45}
46
47#[derive(Clone, Debug)]
48pub struct ConstantRetryConfiguration {
49 max_attempts: u64,
50 cooldown: Duration,
51}
52
53pub struct TychoStreamBuilder {
54 tycho_url: String,
55 chain: Chain,
56 exchanges: HashMap<String, ComponentFilter>,
57 block_time: u64,
58 timeout: u64,
59 max_missed_blocks: u64,
60 state_sync_retry_config: RetryConfiguration,
61 websockets_retry_config: RetryConfiguration,
62 no_state: bool,
63 auth_key: Option<String>,
64 no_tls: bool,
65 include_tvl: bool,
66}
67
68impl TychoStreamBuilder {
69 pub fn new(tycho_url: &str, chain: Chain) -> Self {
72 let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
73 Self {
74 tycho_url: tycho_url.to_string(),
75 chain,
76 exchanges: HashMap::new(),
77 block_time,
78 timeout,
79 max_missed_blocks,
80 state_sync_retry_config: RetryConfiguration::constant(
81 32,
82 Duration::from_secs(max(block_time / 2, 2)),
83 ),
84 websockets_retry_config: RetryConfiguration::constant(
85 128,
86 Duration::from_secs(max(block_time / 4, 1)),
87 ),
88 no_state: false,
89 auth_key: None,
90 no_tls: true,
91 include_tvl: false,
92 }
93 }
94
95 fn default_timing(chain: &Chain) -> (u64, u64, u64) {
98 match chain {
99 Chain::Ethereum => (12, 36, 10),
100 Chain::Starknet => (2, 8, 50),
101 Chain::ZkSync => (3, 12, 50),
102 Chain::Arbitrum => (1, 2, 100), Chain::Base => (2, 12, 50),
104 Chain::Unichain => (1, 10, 100),
105 }
106 }
107
108 pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
110 self.exchanges
111 .insert(name.to_string(), filter);
112 self
113 }
114
115 pub fn block_time(mut self, block_time: u64) -> Self {
117 self.block_time = block_time;
118 self
119 }
120
121 pub fn timeout(mut self, timeout: u64) -> Self {
123 self.timeout = timeout;
124 self
125 }
126
127 pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
128 self.max_missed_blocks = max_missed_blocks;
129 self
130 }
131
132 pub fn websockets_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
133 self.websockets_retry_config = retry_config.clone();
134 self.warn_on_potential_timing_issues();
135 self
136 }
137
138 pub fn state_synchronizer_retry_config(mut self, retry_config: &RetryConfiguration) -> Self {
139 self.state_sync_retry_config = retry_config.clone();
140 self.warn_on_potential_timing_issues();
141 self
142 }
143
144 fn warn_on_potential_timing_issues(&self) {
145 let (RetryConfiguration::Constant(state_config), RetryConfiguration::Constant(ws_config)) =
146 (&self.state_sync_retry_config, &self.websockets_retry_config);
147
148 if ws_config.cooldown >= state_config.cooldown {
149 warn!(
150 "Websocket cooldown should be < than state syncronizer cooldown \
151 to avoid spending retries due to disconnected websocket."
152 )
153 }
154 }
155
156 pub fn no_state(mut self, no_state: bool) -> Self {
158 self.no_state = no_state;
159 self
160 }
161
162 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
167 self.auth_key = auth_key;
168 self.no_tls = false;
169 self
170 }
171
172 pub fn no_tls(mut self, no_tls: bool) -> Self {
174 self.no_tls = no_tls;
175 self
176 }
177
178 pub fn include_tvl(mut self, include_tvl: bool) -> Self {
182 self.include_tvl = include_tvl;
183 self
184 }
185
186 pub async fn build(
189 self,
190 ) -> Result<(JoinHandle<()>, Receiver<FeedMessage<BlockHeader>>), StreamError> {
191 if self.exchanges.is_empty() {
192 return Err(StreamError::SetUpError(
193 "At least one exchange must be registered.".to_string(),
194 ));
195 }
196
197 let auth_key = self
199 .auth_key
200 .clone()
201 .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
202
203 let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
205 info!("Using non-secure connection: ws:// and http://");
206 let tycho_ws_url = format!("ws://{}", self.tycho_url);
207 let tycho_rpc_url = format!("http://{}", self.tycho_url);
208 (tycho_ws_url, tycho_rpc_url)
209 } else {
210 info!("Using secure connection: wss:// and https://");
211 let tycho_ws_url = format!("wss://{}", self.tycho_url);
212 let tycho_rpc_url = format!("https://{}", self.tycho_url);
213 (tycho_ws_url, tycho_rpc_url)
214 };
215
216 #[allow(unreachable_patterns)]
218 let ws_client = match &self.websockets_retry_config {
219 RetryConfiguration::Constant(config) => WsDeltasClient::new_with_reconnects(
220 &tycho_ws_url,
221 auth_key.as_deref(),
222 config.max_attempts,
223 config.cooldown,
224 ),
225 _ => {
226 return Err(StreamError::SetUpError(
227 "Unknown websocket configuration variant!".to_string(),
228 ));
229 }
230 }
231 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
232 let rpc_client = HttpRPCClient::new(&tycho_rpc_url, auth_key.as_deref())
233 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
234 let ws_jh = ws_client
235 .connect()
236 .await
237 .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
238
239 let mut block_sync = BlockSynchronizer::new(
241 Duration::from_secs(self.block_time),
242 Duration::from_secs(self.timeout),
243 self.max_missed_blocks,
244 );
245
246 self.display_available_protocols(&rpc_client)
247 .await;
248
249 for (name, filter) in self.exchanges {
251 info!("Registering exchange: {}", name);
252 let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
253 #[allow(unreachable_patterns)]
254 let sync = match &self.state_sync_retry_config {
255 RetryConfiguration::Constant(retry_config) => ProtocolStateSynchronizer::new(
256 id.clone(),
257 true,
258 filter,
259 retry_config.max_attempts,
260 retry_config.cooldown,
261 !self.no_state,
262 self.include_tvl,
263 rpc_client.clone(),
264 ws_client.clone(),
265 self.block_time + self.timeout,
266 ),
267 _ => {
268 return Err(StreamError::SetUpError(
269 "Unknown state synchronizer configuration variant!".to_string(),
270 ));
271 }
272 };
273 block_sync = block_sync.register_synchronizer(id, sync);
274 }
275
276 let (sync_jh, rx) = block_sync
278 .run()
279 .await
280 .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
281
282 let handle = tokio::spawn(async move {
284 tokio::select! {
285 res = ws_jh => {
286 let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
287 }
288 res = sync_jh => {
289 res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
290 }
291 }
292 if let Err(e) = ws_client.close().await {
293 warn!(?e, "Failed to close WebSocket client");
294 }
295 });
296
297 Ok((handle, rx))
298 }
299
300 async fn display_available_protocols(&self, rpc_client: &HttpRPCClient) {
303 let available_protocols_set = rpc_client
304 .get_protocol_systems(&ProtocolSystemsRequestBody {
305 chain: self.chain,
306 pagination: PaginationParams { page: 0, page_size: 100 },
307 })
308 .await
309 .map(|resp| {
310 resp.protocol_systems
311 .into_iter()
312 .collect::<HashSet<_>>()
313 })
314 .map_err(|e| {
315 warn!(
316 "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
317 );
318 e
319 })
320 .ok();
321
322 if let Some(not_requested_protocols) = available_protocols_set
323 .map(|available_protocols_set| {
324 let requested_protocol_set = self
325 .exchanges
326 .keys()
327 .cloned()
328 .collect::<HashSet<_>>();
329
330 available_protocols_set
331 .difference(&requested_protocol_set)
332 .cloned()
333 .collect::<Vec<_>>()
334 })
335 .filter(|not_requested_protocols| !not_requested_protocols.is_empty())
336 {
337 info!("Other available protocols: {}", not_requested_protocols.join(", "))
338 }
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345
346 #[test]
347 fn test_retry_configuration_constant() {
348 let config = RetryConfiguration::constant(5, Duration::from_secs(10));
349 match config {
350 RetryConfiguration::Constant(c) => {
351 assert_eq!(c.max_attempts, 5);
352 assert_eq!(c.cooldown, Duration::from_secs(10));
353 }
354 }
355 }
356
357 #[test]
358 fn test_stream_builder_retry_configs() {
359 let mut builder = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum);
360 let ws_config = RetryConfiguration::constant(10, Duration::from_secs(2));
361 let state_config = RetryConfiguration::constant(20, Duration::from_secs(5));
362
363 builder = builder
364 .websockets_retry_config(&ws_config)
365 .state_synchronizer_retry_config(&state_config);
366
367 match (&builder.websockets_retry_config, &builder.state_sync_retry_config) {
369 (RetryConfiguration::Constant(ws), RetryConfiguration::Constant(state)) => {
370 assert_eq!(ws.max_attempts, 10);
371 assert_eq!(ws.cooldown, Duration::from_secs(2));
372 assert_eq!(state.max_attempts, 20);
373 assert_eq!(state.cooldown, Duration::from_secs(5));
374 }
375 }
376 }
377
378 #[tokio::test]
379 async fn test_no_exchanges() {
380 let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
381 .auth_key(Some("my_api_key".into()))
382 .build()
383 .await;
384 assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
385 }
386
387 #[ignore = "require tycho gateway"]
388 #[tokio::test]
389 async fn teat_simple_build() {
390 let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
391 let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
392 .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
393 .auth_key(Some(token))
394 .build()
395 .await;
396
397 dbg!(&receiver);
398
399 assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
400 }
401}