1use std::{
2 collections::{HashMap, HashSet},
3 env,
4 time::Duration,
5};
6
7use thiserror::Error;
8use tokio::{sync::mpsc::Receiver, task::JoinHandle};
9use tracing::{info, warn};
10use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
11
12use crate::{
13 deltas::DeltasClient,
14 feed::{
15 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer, BlockHeader,
16 BlockSynchronizer, FeedMessage,
17 },
18 rpc::RPCClient,
19 HttpRPCClient, WsDeltasClient,
20};
21
22#[derive(Error, Debug)]
23pub enum StreamError {
24 #[error("Error during stream set up: {0}")]
25 SetUpError(String),
26
27 #[error("WebSocket client connection error: {0}")]
28 WebSocketConnectionError(String),
29
30 #[error("BlockSynchronizer error: {0}")]
31 BlockSynchronizerError(String),
32}
33
34pub struct TychoStreamBuilder {
35 tycho_url: String,
36 chain: Chain,
37 exchanges: HashMap<String, ComponentFilter>,
38 block_time: u64,
39 timeout: u64,
40 max_missed_blocks: u64,
41 no_state: bool,
42 auth_key: Option<String>,
43 no_tls: bool,
44 include_tvl: bool,
45}
46
47impl TychoStreamBuilder {
48 pub fn new(tycho_url: &str, chain: Chain) -> Self {
51 let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
52 Self {
53 tycho_url: tycho_url.to_string(),
54 chain,
55 exchanges: HashMap::new(),
56 block_time,
57 timeout,
58 max_missed_blocks,
59 no_state: false,
60 auth_key: None,
61 no_tls: true,
62 include_tvl: false,
63 }
64 }
65
66 fn default_timing(chain: &Chain) -> (u64, u64, u64) {
69 match chain {
70 Chain::Ethereum => (12, 36, 10),
71 Chain::Starknet => (2, 8, 50),
72 Chain::ZkSync => (3, 12, 50),
73 Chain::Arbitrum => (1, 2, 100), Chain::Base => (2, 12, 50),
75 Chain::Unichain => (1, 10, 100),
76 }
77 }
78
79 pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
81 self.exchanges
82 .insert(name.to_string(), filter);
83 self
84 }
85
86 pub fn block_time(mut self, block_time: u64) -> Self {
88 self.block_time = block_time;
89 self
90 }
91
92 pub fn timeout(mut self, timeout: u64) -> Self {
94 self.timeout = timeout;
95 self
96 }
97
98 pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
99 self.max_missed_blocks = max_missed_blocks;
100 self
101 }
102
103 pub fn no_state(mut self, no_state: bool) -> Self {
105 self.no_state = no_state;
106 self
107 }
108
109 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
114 self.auth_key = auth_key;
115 self.no_tls = false;
116 self
117 }
118
119 pub fn no_tls(mut self, no_tls: bool) -> Self {
121 self.no_tls = no_tls;
122 self
123 }
124
125 pub fn include_tvl(mut self, include_tvl: bool) -> Self {
129 self.include_tvl = include_tvl;
130 self
131 }
132
133 pub async fn build(
136 self,
137 ) -> Result<(JoinHandle<()>, Receiver<FeedMessage<BlockHeader>>), StreamError> {
138 if self.exchanges.is_empty() {
139 return Err(StreamError::SetUpError(
140 "At least one exchange must be registered.".to_string(),
141 ));
142 }
143
144 let auth_key = self
146 .auth_key
147 .clone()
148 .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
149
150 let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
152 info!("Using non-secure connection: ws:// and http://");
153 let tycho_ws_url = format!("ws://{}", self.tycho_url);
154 let tycho_rpc_url = format!("http://{}", self.tycho_url);
155 (tycho_ws_url, tycho_rpc_url)
156 } else {
157 info!("Using secure connection: wss:// and https://");
158 let tycho_ws_url = format!("wss://{}", self.tycho_url);
159 let tycho_rpc_url = format!("https://{}", self.tycho_url);
160 (tycho_ws_url, tycho_rpc_url)
161 };
162
163 let ws_client = WsDeltasClient::new(&tycho_ws_url, auth_key.as_deref())
165 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
166 let rpc_client = HttpRPCClient::new(&tycho_rpc_url, auth_key.as_deref())
167 .map_err(|e| StreamError::SetUpError(e.to_string()))?;
168 let ws_jh = ws_client
169 .connect()
170 .await
171 .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
172
173 let mut block_sync = BlockSynchronizer::new(
175 Duration::from_secs(self.block_time),
176 Duration::from_secs(self.timeout),
177 self.max_missed_blocks,
178 );
179
180 self.display_available_protocols(&rpc_client)
181 .await;
182
183 for (name, filter) in self.exchanges {
185 info!("Registering exchange: {}", name);
186 let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
187 let sync = ProtocolStateSynchronizer::new(
188 id.clone(),
189 true,
190 filter,
191 3,
192 !self.no_state,
193 self.include_tvl,
194 rpc_client.clone(),
195 ws_client.clone(),
196 self.block_time + self.timeout,
197 );
198 block_sync = block_sync.register_synchronizer(id, sync);
199 }
200
201 let (sync_jh, rx) = block_sync
203 .run()
204 .await
205 .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
206
207 let handle = tokio::spawn(async move {
209 tokio::select! {
210 res = ws_jh => {
211 let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
212 }
213 res = sync_jh => {
214 res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
215 }
216 }
217 });
218
219 Ok((handle, rx))
220 }
221
222 async fn display_available_protocols(&self, rpc_client: &HttpRPCClient) {
225 let available_protocols_set = rpc_client
226 .get_protocol_systems(&ProtocolSystemsRequestBody {
227 chain: self.chain,
228 pagination: PaginationParams { page: 0, page_size: 100 },
229 })
230 .await
231 .map(|resp| {
232 resp.protocol_systems
233 .into_iter()
234 .collect::<HashSet<_>>()
235 })
236 .map_err(|e| {
237 warn!(
238 "Failed to fetch protocol systems: {e}. Skipping protocol availability check."
239 );
240 e
241 })
242 .ok();
243
244 if let Some(not_requested_protocols) = available_protocols_set
245 .map(|available_protocols_set| {
246 let requested_protocol_set = self
247 .exchanges
248 .keys()
249 .cloned()
250 .collect::<HashSet<_>>();
251
252 available_protocols_set
253 .difference(&requested_protocol_set)
254 .cloned()
255 .collect::<Vec<_>>()
256 })
257 .filter(|not_requested_protocols| !not_requested_protocols.is_empty())
258 {
259 info!("Other available protocols: {}", not_requested_protocols.join(", "))
260 }
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267
268 #[tokio::test]
269 async fn test_no_exchanges() {
270 let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
271 .auth_key(Some("my_api_key".into()))
272 .build()
273 .await;
274 assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
275 }
276
277 #[ignore = "require tycho gateway"]
278 #[tokio::test]
279 async fn teat_simple_build() {
280 let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
281 let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
282 .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
283 .auth_key(Some(token))
284 .build()
285 .await;
286
287 dbg!(&receiver);
288
289 assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
290 }
291}