1use std::{collections::HashSet, path::Path, str::FromStr, time::Duration};
2
3use clap::Parser;
4use tracing::{debug, error, info, warn};
5use tracing_appender::rolling;
6use tycho_common::dto::{Chain, ExtractorIdentity};
7
8use crate::{
9 deltas::DeltasClient,
10 feed::{
11 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
12 BlockSynchronizer,
13 },
14 rpc::HttpRPCClientOptions,
15 stream::ProtocolSystemsInfo,
16 HttpRPCClient, WsDeltasClient,
17};
18
19#[derive(Parser, Debug, Clone, PartialEq)]
24#[clap(version = env!("CARGO_PKG_VERSION"))]
25struct CliArgs {
26 #[clap(long, default_value = "localhost:4242", env = "TYCHO_URL")]
28 tycho_url: String,
29
30 #[clap(short = 'k', long, env = "TYCHO_AUTH_TOKEN")]
33 auth_key: Option<String>,
34
35 #[clap(long)]
37 no_tls: bool,
38
39 #[clap(short = 'c', long, default_value = "ethereum")]
41 pub chain: String,
42
43 #[clap(short = 'e', long, number_of_values = 1)]
46 exchange: Vec<String>,
47
48 #[clap(long, default_value = "10")]
51 min_tvl: f64,
52
53 #[clap(long)]
56 remove_tvl_threshold: Option<f64>,
57
58 #[clap(long)]
61 add_tvl_threshold: Option<f64>,
62
63 #[clap(long, default_value = "600")]
70 block_time: u64,
71
72 #[clap(long, default_value = "1")]
75 timeout: u64,
76
77 #[clap(long, default_value = "logs")]
79 log_folder: String,
80
81 #[clap(long)]
83 example: bool,
84
85 #[clap(long)]
88 no_state: bool,
89
90 #[clap(short='n', long, default_value=None)]
94 max_messages: Option<usize>,
95
96 #[clap(long, default_value = "10")]
99 max_missed_blocks: u64,
100
101 #[clap(long)]
105 include_tvl: bool,
106
107 #[clap(long)]
110 disable_compression: bool,
111
112 #[clap(long)]
116 partial_blocks: bool,
117
118 #[clap(long)]
121 verbose: bool,
122
123 #[clap(long, default_value = "32")]
125 max_retries: u64,
126
127 #[clap(long)]
129 blocklist_config: Option<std::path::PathBuf>,
130}
131
132impl CliArgs {
133 fn validate(&self) -> Result<(), String> {
134 match (self.remove_tvl_threshold, self.add_tvl_threshold) {
136 (Some(remove), Some(add)) if remove >= add => {
137 return Err("remove_tvl_threshold must be less than add_tvl_threshold".to_string());
138 }
139 (Some(_), None) | (None, Some(_)) => {
140 return Err(
141 "Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string()
142 );
143 }
144 _ => {}
145 }
146
147 Ok(())
148 }
149}
150
151#[derive(serde::Deserialize)]
152struct BlocklistFile {
153 ids: Vec<String>,
154}
155
156fn load_blocklist(path: &Path) -> Result<Vec<String>, String> {
157 let content = std::fs::read_to_string(path)
158 .map_err(|e| format!("Failed to read blocklist file {}: {e}", path.display()))?;
159 let file: BlocklistFile = toml::from_str(&content)
160 .map_err(|e| format!("Failed to parse blocklist file {}: {e}", path.display()))?;
161 Ok(file.ids)
162}
163
164pub async fn run_cli() -> Result<(), String> {
165 let args: CliArgs = CliArgs::parse();
167 args.validate()?;
168
169 let log_level = if args.verbose { "debug" } else { "info" };
171 let (non_blocking, _guard) =
172 tracing_appender::non_blocking(rolling::never(&args.log_folder, "dev_logs.log"));
173 let subscriber = tracing_subscriber::fmt()
174 .with_env_filter(
175 tracing_subscriber::EnvFilter::try_from_default_env()
176 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level)),
177 )
178 .with_writer(non_blocking)
179 .finish();
180
181 tracing::subscriber::set_global_default(subscriber)
182 .map_err(|e| format!("Failed to set up logging subscriber: {e}"))?;
183
184 let exchanges: Vec<(String, Option<String>)> = if args.example {
188 vec![
194 (
195 "uniswap_v3".to_string(),
196 Some("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640".to_string()),
197 ),
198 (
199 "uniswap_v2".to_string(),
200 Some("0xa478c2975ab1ea89e8196811f51a7b7ade33eb11".to_string()),
201 ),
202 ]
203 } else {
204 args.exchange
205 .iter()
206 .filter_map(|e| {
207 if e.contains('-') {
208 let parts: Vec<&str> = e.split('-').collect();
209 if parts.len() == 2 {
210 Some((parts[0].to_string(), Some(parts[1].to_string())))
211 } else {
212 warn!("Ignoring invalid exchange format: {}", e);
213 None
214 }
215 } else {
216 Some((e.to_string(), None))
217 }
218 })
219 .collect()
220 };
221
222 info!("Running with exchanges: {:?}", exchanges);
223
224 run(exchanges, args).await?;
225 Ok(())
226}
227
228async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) -> Result<(), String> {
229 let blocklist = match &args.blocklist_config {
230 Some(path) => load_blocklist(path)?,
231 None => Vec::new(),
232 };
233
234 info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
235 let (tycho_ws_url, tycho_rpc_url) = if args.no_tls || args.auth_key.is_none() {
237 info!("Using non-secure connection: ws:// and http://");
238 let tycho_ws_url = format!("ws://{url}", url = &args.tycho_url);
239 let tycho_rpc_url = format!("http://{url}", url = &args.tycho_url);
240 (tycho_ws_url, tycho_rpc_url)
241 } else {
242 info!("Using secure connection: wss:// and https://");
243 let tycho_ws_url = format!("wss://{url}", url = &args.tycho_url);
244 let tycho_rpc_url = format!("https://{url}", url = &args.tycho_url);
245 (tycho_ws_url, tycho_rpc_url)
246 };
247
248 let ws_client = WsDeltasClient::new(&tycho_ws_url, args.auth_key.as_deref())
249 .map_err(|e| format!("Failed to create WebSocket client: {e}"))?;
250 let rpc_client = HttpRPCClient::new(
251 &tycho_rpc_url,
252 HttpRPCClientOptions::new()
253 .with_auth_key(args.auth_key.clone())
254 .with_compression(!args.disable_compression),
255 )
256 .map_err(|e| format!("Failed to create RPC client: {e}"))?;
257 let chain = Chain::from_str(&args.chain)
258 .map_err(|_| format!("Unknown chain: {chain}", chain = &args.chain))?;
259 let ws_jh = ws_client
260 .connect()
261 .await
262 .map_err(|e| format!("WebSocket client connection error: {e}"))?;
263
264 let mut block_sync = BlockSynchronizer::new(
265 Duration::from_secs(args.block_time),
266 Duration::from_secs(args.timeout),
267 args.max_missed_blocks,
268 );
269
270 if let Some(mm) = &args.max_messages {
271 block_sync.max_messages(*mm);
272 }
273
274 let requested_protocol_set: HashSet<_> = exchanges
275 .iter()
276 .map(|(name, _)| name.clone())
277 .collect();
278 let protocol_info =
279 ProtocolSystemsInfo::fetch(&rpc_client, chain, &requested_protocol_set).await;
280 protocol_info.log_other_available();
281 let dci_protocols = protocol_info.dci_protocols;
282
283 for (name, address) in exchanges {
284 debug!("Registering exchange: {}", name);
285 let id = ExtractorIdentity { chain, name: name.clone() };
286 let filter = if let Some(address) = address {
287 ComponentFilter::Ids(vec![address])
288 } else if let (Some(remove_tvl), Some(add_tvl)) =
289 (args.remove_tvl_threshold, args.add_tvl_threshold)
290 {
291 ComponentFilter::with_tvl_range(remove_tvl, add_tvl)
292 } else {
293 ComponentFilter::with_tvl_range(args.min_tvl, args.min_tvl)
294 }
295 .blocklist(blocklist.clone());
296 let uses_dci = dci_protocols.contains(&name);
297 let sync = ProtocolStateSynchronizer::new(
298 id.clone(),
299 true,
300 filter,
301 args.max_retries,
302 Duration::from_secs(args.block_time / 2),
303 !args.no_state,
304 args.include_tvl,
305 !args.disable_compression,
306 rpc_client.clone(),
307 ws_client.clone(),
308 args.block_time + args.timeout,
309 )
310 .with_dci(uses_dci)
311 .with_partial_blocks(args.partial_blocks);
312 block_sync = block_sync.register_synchronizer(id, sync);
313 }
314
315 let (sync_jh, mut rx) = block_sync
316 .run()
317 .await
318 .map_err(|e| format!("Failed to start block synchronizer: {e}"))?;
319
320 let msg_printer = tokio::spawn(async move {
321 while let Some(result) = rx.recv().await {
322 let msg =
323 result.map_err(|e| format!("Message printer received synchronizer error: {e}"))?;
324
325 if let Ok(msg_json) = serde_json::to_string(&msg) {
326 println!("{msg_json}");
327 } else {
328 error!("Failed to serialize FeedMessage");
330 };
331 }
332
333 Ok::<(), String>(())
334 });
335
336 let (failed_task, shutdown_reason) = tokio::select! {
338 res = ws_jh => (
339 "WebSocket",
340 extract_nested_error(res)
341 ),
342 res = sync_jh => (
343 "BlockSynchronizer",
344 extract_nested_error::<_, _, String>(Ok(res))
345 ),
346 res = msg_printer => (
347 "MessagePrinter",
348 extract_nested_error(res)
349 )
350 };
351
352 debug!("RX closed");
353 Err(format!(
354 "{failed_task} task terminated: {}",
355 shutdown_reason.unwrap_or("unknown reason".to_string())
356 ))
357}
358
359#[inline]
360fn extract_nested_error<T, E1: ToString, E2: ToString>(
361 res: Result<Result<T, E1>, E2>,
362) -> Option<String> {
363 res.map_err(|e| e.to_string())
364 .and_then(|r| r.map_err(|e| e.to_string()))
365 .err()
366}
367
368#[cfg(test)]
369mod cli_tests {
370 use clap::Parser;
371
372 use super::CliArgs;
373
374 #[tokio::test]
375 async fn test_cli_args() {
376 let args = CliArgs::parse_from([
377 "tycho-client",
378 "--tycho-url",
379 "localhost:5000",
380 "--exchange",
381 "uniswap_v2",
382 "--min-tvl",
383 "3000",
384 "--block-time",
385 "50",
386 "--timeout",
387 "5",
388 "--log-folder",
389 "test_logs",
390 "--example",
391 "--max-messages",
392 "1",
393 "--blocklist-config",
394 "blocklist.toml",
395 ]);
396 let exchanges: Vec<String> = vec!["uniswap_v2".to_string()];
397 assert_eq!(args.tycho_url, "localhost:5000");
398 assert_eq!(args.exchange, exchanges);
399 assert_eq!(args.min_tvl, 3000.0);
400 assert_eq!(args.block_time, 50);
401 assert_eq!(args.timeout, 5);
402 assert_eq!(args.log_folder, "test_logs");
403 assert_eq!(args.max_messages, Some(1));
404 assert!(args.example);
405 assert_eq!(args.disable_compression, false);
406 assert_eq!(args.partial_blocks, false);
407 assert_eq!(args.blocklist_config, Some(std::path::PathBuf::from("blocklist.toml")));
408 }
409}