1use std::{collections::HashSet, str::FromStr, time::Duration};
2
3use clap::Parser;
4use tracing::{debug, error, info, warn};
5use tracing_appender::rolling;
6use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
7
8use crate::{
9 deltas::DeltasClient,
10 feed::{
11 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
12 BlockSynchronizer,
13 },
14 rpc::{HttpRPCClientOptions, RPCClient},
15 HttpRPCClient, WsDeltasClient,
16};
17
18#[derive(Parser, Debug, Clone, PartialEq)]
23#[clap(version = env!("CARGO_PKG_VERSION"))]
24struct CliArgs {
25 #[clap(long, default_value = "localhost:4242", env = "TYCHO_URL")]
27 tycho_url: String,
28
29 #[clap(short = 'k', long, env = "TYCHO_AUTH_TOKEN")]
32 auth_key: Option<String>,
33
34 #[clap(long)]
36 no_tls: bool,
37
38 #[clap(short = 'c', long, default_value = "ethereum")]
40 pub chain: String,
41
42 #[clap(short = 'e', long, number_of_values = 1)]
45 exchange: Vec<String>,
46
47 #[clap(long, default_value = "10")]
50 min_tvl: f64,
51
52 #[clap(long)]
55 remove_tvl_threshold: Option<f64>,
56
57 #[clap(long)]
60 add_tvl_threshold: Option<f64>,
61
62 #[clap(long, default_value = "600")]
69 block_time: u64,
70
71 #[clap(long, default_value = "1")]
74 timeout: u64,
75
76 #[clap(long, default_value = "logs")]
78 log_folder: String,
79
80 #[clap(long)]
82 example: bool,
83
84 #[clap(long)]
87 no_state: bool,
88
89 #[clap(short='n', long, default_value=None)]
93 max_messages: Option<usize>,
94
95 #[clap(long, default_value = "10")]
98 max_missed_blocks: u64,
99
100 #[clap(long)]
104 include_tvl: bool,
105
106 #[clap(long)]
109 disable_compression: bool,
110
111 #[clap(long)]
115 partial_blocks: bool,
116
117 #[clap(long)]
120 verbose: bool,
121}
122
123impl CliArgs {
124 fn validate(&self) -> Result<(), String> {
125 match (self.remove_tvl_threshold, self.add_tvl_threshold) {
127 (Some(remove), Some(add)) if remove >= add => {
128 return Err("remove_tvl_threshold must be less than add_tvl_threshold".to_string());
129 }
130 (Some(_), None) | (None, Some(_)) => {
131 return Err(
132 "Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string()
133 );
134 }
135 _ => {}
136 }
137
138 Ok(())
139 }
140}
141
142pub async fn run_cli() -> Result<(), String> {
143 let args: CliArgs = CliArgs::parse();
145 args.validate()?;
146
147 let log_level = if args.verbose { "debug" } else { "info" };
149 let (non_blocking, _guard) =
150 tracing_appender::non_blocking(rolling::never(&args.log_folder, "dev_logs.log"));
151 let subscriber = tracing_subscriber::fmt()
152 .with_env_filter(
153 tracing_subscriber::EnvFilter::try_from_default_env()
154 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level)),
155 )
156 .with_writer(non_blocking)
157 .finish();
158
159 tracing::subscriber::set_global_default(subscriber)
160 .map_err(|e| format!("Failed to set up logging subscriber: {e}"))?;
161
162 let exchanges: Vec<(String, Option<String>)> = if args.example {
166 vec![
172 (
173 "uniswap_v3".to_string(),
174 Some("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640".to_string()),
175 ),
176 (
177 "uniswap_v2".to_string(),
178 Some("0xa478c2975ab1ea89e8196811f51a7b7ade33eb11".to_string()),
179 ),
180 ]
181 } else {
182 args.exchange
183 .iter()
184 .filter_map(|e| {
185 if e.contains('-') {
186 let parts: Vec<&str> = e.split('-').collect();
187 if parts.len() == 2 {
188 Some((parts[0].to_string(), Some(parts[1].to_string())))
189 } else {
190 warn!("Ignoring invalid exchange format: {}", e);
191 None
192 }
193 } else {
194 Some((e.to_string(), None))
195 }
196 })
197 .collect()
198 };
199
200 info!("Running with exchanges: {:?}", exchanges);
201
202 run(exchanges, args).await?;
203 Ok(())
204}
205
206async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) -> Result<(), String> {
207 info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
208 let (tycho_ws_url, tycho_rpc_url) = if args.no_tls || args.auth_key.is_none() {
210 info!("Using non-secure connection: ws:// and http://");
211 let tycho_ws_url = format!("ws://{url}", url = &args.tycho_url);
212 let tycho_rpc_url = format!("http://{url}", url = &args.tycho_url);
213 (tycho_ws_url, tycho_rpc_url)
214 } else {
215 info!("Using secure connection: wss:// and https://");
216 let tycho_ws_url = format!("wss://{url}", url = &args.tycho_url);
217 let tycho_rpc_url = format!("https://{url}", url = &args.tycho_url);
218 (tycho_ws_url, tycho_rpc_url)
219 };
220
221 let ws_client = WsDeltasClient::new(&tycho_ws_url, args.auth_key.as_deref())
222 .map_err(|e| format!("Failed to create WebSocket client: {e}"))?;
223 let rpc_client = HttpRPCClient::new(
224 &tycho_rpc_url,
225 HttpRPCClientOptions::new()
226 .with_auth_key(args.auth_key.clone())
227 .with_compression(!args.disable_compression),
228 )
229 .map_err(|e| format!("Failed to create RPC client: {e}"))?;
230 let chain = Chain::from_str(&args.chain)
231 .map_err(|_| format!("Unknown chain: {chain}", chain = &args.chain))?;
232 let ws_jh = ws_client
233 .connect()
234 .await
235 .map_err(|e| format!("WebSocket client connection error: {e}"))?;
236
237 let mut block_sync = BlockSynchronizer::new(
238 Duration::from_secs(args.block_time),
239 Duration::from_secs(args.timeout),
240 args.max_missed_blocks,
241 );
242
243 if let Some(mm) = &args.max_messages {
244 block_sync.max_messages(*mm);
245 }
246
247 let available_protocols_set = rpc_client
248 .get_protocol_systems(&ProtocolSystemsRequestBody {
249 chain,
250 pagination: PaginationParams { page: 0, page_size: 100 },
251 })
252 .await
253 .map_err(|e| format!("Failed to get protocol systems: {e}"))?
254 .protocol_systems
255 .into_iter()
256 .collect::<HashSet<_>>();
257
258 let requested_protocol_set = exchanges
259 .iter()
260 .map(|(name, _)| name.clone())
261 .collect::<HashSet<_>>();
262
263 let not_requested_protocols = available_protocols_set
264 .difference(&requested_protocol_set)
265 .cloned()
266 .collect::<Vec<_>>();
267
268 if !not_requested_protocols.is_empty() {
269 info!("Other available protocols: {}", not_requested_protocols.join(", "));
270 }
271
272 for (name, address) in exchanges {
273 debug!("Registering exchange: {}", name);
274 let id = ExtractorIdentity { chain, name: name.clone() };
275 let filter = if let Some(address) = address {
276 ComponentFilter::Ids(vec![address])
277 } else if let (Some(remove_tvl), Some(add_tvl)) =
278 (args.remove_tvl_threshold, args.add_tvl_threshold)
279 {
280 ComponentFilter::with_tvl_range(remove_tvl, add_tvl)
281 } else {
282 ComponentFilter::with_tvl_range(args.min_tvl, args.min_tvl)
283 };
284 let sync = ProtocolStateSynchronizer::new(
285 id.clone(),
286 true,
287 filter,
288 32,
289 Duration::from_secs(args.block_time / 2),
290 !args.no_state,
291 args.include_tvl,
292 !args.disable_compression,
293 rpc_client.clone(),
294 ws_client.clone(),
295 args.block_time + args.timeout,
296 )
297 .with_partial_blocks(args.partial_blocks);
298 block_sync = block_sync.register_synchronizer(id, sync);
299 }
300
301 let (sync_jh, mut rx) = block_sync
302 .run()
303 .await
304 .map_err(|e| format!("Failed to start block synchronizer: {e}"))?;
305
306 let msg_printer = tokio::spawn(async move {
307 while let Some(result) = rx.recv().await {
308 let msg =
309 result.map_err(|e| format!("Message printer received synchronizer error: {e}"))?;
310
311 if let Ok(msg_json) = serde_json::to_string(&msg) {
312 println!("{msg_json}");
313 } else {
314 error!("Failed to serialize FeedMessage");
316 };
317 }
318
319 Ok::<(), String>(())
320 });
321
322 let (failed_task, shutdown_reason) = tokio::select! {
324 res = ws_jh => (
325 "WebSocket",
326 extract_nested_error(res)
327 ),
328 res = sync_jh => (
329 "BlockSynchronizer",
330 extract_nested_error::<_, _, String>(Ok(res))
331 ),
332 res = msg_printer => (
333 "MessagePrinter",
334 extract_nested_error(res)
335 )
336 };
337
338 debug!("RX closed");
339 Err(format!(
340 "{failed_task} task terminated: {}",
341 shutdown_reason.unwrap_or("unknown reason".to_string())
342 ))
343}
344
345#[inline]
346fn extract_nested_error<T, E1: ToString, E2: ToString>(
347 res: Result<Result<T, E1>, E2>,
348) -> Option<String> {
349 res.map_err(|e| e.to_string())
350 .and_then(|r| r.map_err(|e| e.to_string()))
351 .err()
352}
353
354#[cfg(test)]
355mod cli_tests {
356 use clap::Parser;
357
358 use super::CliArgs;
359
360 #[tokio::test]
361 async fn test_cli_args() {
362 let args = CliArgs::parse_from([
363 "tycho-client",
364 "--tycho-url",
365 "localhost:5000",
366 "--exchange",
367 "uniswap_v2",
368 "--min-tvl",
369 "3000",
370 "--block-time",
371 "50",
372 "--timeout",
373 "5",
374 "--log-folder",
375 "test_logs",
376 "--example",
377 "--max-messages",
378 "1",
379 ]);
380 let exchanges: Vec<String> = vec!["uniswap_v2".to_string()];
381 assert_eq!(args.tycho_url, "localhost:5000");
382 assert_eq!(args.exchange, exchanges);
383 assert_eq!(args.min_tvl, 3000.0);
384 assert_eq!(args.block_time, 50);
385 assert_eq!(args.timeout, 5);
386 assert_eq!(args.log_folder, "test_logs");
387 assert_eq!(args.max_messages, Some(1));
388 assert!(args.example);
389 assert_eq!(args.disable_compression, false);
390 assert_eq!(args.partial_blocks, false);
391 }
392}