1use std::{
2 collections::{HashMap, HashSet},
3 env,
4 time::Duration,
5};
6
7use thiserror::Error;
8use tokio::{sync::mpsc::Receiver, task::JoinHandle};
9use tracing::info;
10use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
11
12use crate::{
13 deltas::DeltasClient,
14 feed::{
15 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
16 BlockSynchronizer, FeedMessage,
17 },
18 rpc::RPCClient,
19 HttpRPCClient, WsDeltasClient,
20};
21
22#[derive(Error, Debug)]
23pub enum StreamError {
24 #[error("Error during stream set up: {0}")]
25 SetUpError(String),
26
27 #[error("WebSocket client connection error: {0}")]
28 WebSocketConnectionError(String),
29
30 #[error("BlockSynchronizer error: {0}")]
31 BlockSynchronizerError(String),
32}
33
34pub struct TychoStreamBuilder {
35 tycho_url: String,
36 chain: Chain,
37 exchanges: HashMap<String, ComponentFilter>,
38 block_time: u64,
39 timeout: u64,
40 max_missed_blocks: u64,
41 no_state: bool,
42 auth_key: Option<String>,
43 no_tls: bool,
44}
45
46impl TychoStreamBuilder {
47 pub fn new(tycho_url: &str, chain: Chain) -> Self {
50 let (block_time, timeout, max_missed_blocks) = Self::default_timing(&chain);
51 Self {
52 tycho_url: tycho_url.to_string(),
53 chain,
54 exchanges: HashMap::new(),
55 block_time,
56 timeout,
57 max_missed_blocks,
58 no_state: false,
59 auth_key: None,
60 no_tls: true,
61 }
62 }
63
64 fn default_timing(chain: &Chain) -> (u64, u64, u64) {
67 match chain {
68 Chain::Ethereum => (12, 36, 10),
69 Chain::Starknet => (2, 8, 50),
70 Chain::ZkSync => (3, 12, 50),
71 Chain::Arbitrum => (1, 2, 100), Chain::Base => (2, 12, 50),
73 Chain::Unichain => (1, 10, 100),
74 }
75 }
76
77 pub fn exchange(mut self, name: &str, filter: ComponentFilter) -> Self {
79 self.exchanges
80 .insert(name.to_string(), filter);
81 self
82 }
83
84 pub fn block_time(mut self, block_time: u64) -> Self {
86 self.block_time = block_time;
87 self
88 }
89
90 pub fn timeout(mut self, timeout: u64) -> Self {
92 self.timeout = timeout;
93 self
94 }
95
96 pub fn max_missed_blocks(mut self, max_missed_blocks: u64) -> Self {
97 self.max_missed_blocks = max_missed_blocks;
98 self
99 }
100
101 pub fn no_state(mut self, no_state: bool) -> Self {
103 self.no_state = no_state;
104 self
105 }
106
107 pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
112 self.auth_key = auth_key;
113 self.no_tls = false;
114 self
115 }
116
117 pub fn no_tls(mut self, no_tls: bool) -> Self {
119 self.no_tls = no_tls;
120 self
121 }
122
123 pub async fn build(self) -> Result<(JoinHandle<()>, Receiver<FeedMessage>), StreamError> {
126 if self.exchanges.is_empty() {
127 return Err(StreamError::SetUpError(
128 "At least one exchange must be registered.".to_string(),
129 ));
130 }
131
132 let auth_key = self
134 .auth_key
135 .or_else(|| env::var("TYCHO_AUTH_TOKEN").ok());
136
137 let (tycho_ws_url, tycho_rpc_url) = if self.no_tls {
139 info!("Using non-secure connection: ws:// and http://");
140 let tycho_ws_url = format!("ws://{}", self.tycho_url);
141 let tycho_rpc_url = format!("http://{}", self.tycho_url);
142 (tycho_ws_url, tycho_rpc_url)
143 } else {
144 info!("Using secure connection: wss:// and https://");
145 let tycho_ws_url = format!("wss://{}", self.tycho_url);
146 let tycho_rpc_url = format!("https://{}", self.tycho_url);
147 (tycho_ws_url, tycho_rpc_url)
148 };
149
150 let ws_client = WsDeltasClient::new(&tycho_ws_url, auth_key.as_deref()).unwrap();
152 let rpc_client = HttpRPCClient::new(&tycho_rpc_url, auth_key.as_deref()).unwrap();
153 let ws_jh = ws_client
154 .connect()
155 .await
156 .map_err(|e| StreamError::WebSocketConnectionError(e.to_string()))?;
157
158 let mut block_sync = BlockSynchronizer::new(
160 Duration::from_secs(self.block_time),
161 Duration::from_secs(self.timeout),
162 self.max_missed_blocks,
163 );
164
165 let available_protocols_set = rpc_client
166 .get_protocol_systems(&ProtocolSystemsRequestBody {
167 chain: self.chain,
168 pagination: PaginationParams { page: 0, page_size: 100 },
169 })
170 .await
171 .unwrap()
172 .protocol_systems
173 .into_iter()
174 .collect::<HashSet<_>>();
175
176 let requested_protocol_set = self
177 .exchanges
178 .keys()
179 .cloned()
180 .collect::<HashSet<_>>();
181
182 let not_requested_protocols = available_protocols_set
183 .difference(&requested_protocol_set)
184 .cloned()
185 .collect::<Vec<_>>();
186
187 if !not_requested_protocols.is_empty() {
188 tracing::info!("Other available protocols: {}", not_requested_protocols.join(", "));
189 }
190
191 for (name, filter) in self.exchanges {
193 info!("Registering exchange: {}", name);
194 let id = ExtractorIdentity { chain: self.chain, name: name.clone() };
195 let sync = ProtocolStateSynchronizer::new(
196 id.clone(),
197 true,
198 filter,
199 3,
200 !self.no_state,
201 rpc_client.clone(),
202 ws_client.clone(),
203 self.block_time + self.timeout,
204 );
205 block_sync = block_sync.register_synchronizer(id, sync);
206 }
207
208 let (sync_jh, rx) = block_sync
210 .run()
211 .await
212 .map_err(|e| StreamError::BlockSynchronizerError(e.to_string()))?;
213
214 let handle = tokio::spawn(async move {
216 tokio::select! {
217 res = ws_jh => {
218 let _ = res.map_err(|e| StreamError::WebSocketConnectionError(e.to_string()));
219 }
220 res = sync_jh => {
221 res.map_err(|e| StreamError::BlockSynchronizerError(e.to_string())).unwrap();
222 }
223 }
224 });
225
226 Ok((handle, rx))
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 #[tokio::test]
235 async fn test_no_exchanges() {
236 let receiver = TychoStreamBuilder::new("localhost:4242", Chain::Ethereum)
237 .auth_key(Some("my_api_key".into()))
238 .build()
239 .await;
240 assert!(receiver.is_err(), "Client should fail to build when no exchanges are registered.");
241 }
242
243 #[ignore = "require tycho gateway"]
244 #[tokio::test]
245 async fn teat_simple_build() {
246 let token = env::var("TYCHO_AUTH_TOKEN").unwrap();
247 let receiver = TychoStreamBuilder::new("tycho-beta.propellerheads.xyz", Chain::Ethereum)
248 .exchange("uniswap_v2", ComponentFilter::with_tvl_range(100.0, 100.0))
249 .auth_key(Some(token))
250 .build()
251 .await;
252
253 dbg!(&receiver);
254
255 assert!(receiver.is_ok(), "Client should build successfully with exchanges registered.");
256 }
257}