1use std::{path::Path, str::FromStr};
2
3use clap::Parser;
4use tracing::info;
5use tracing_appender::rolling;
6use tycho_common::{dto::TvlThresholdTier, models::Chain};
7
8use crate::{
9 feed::{component_tracker::ComponentFilter, dto::FeedMessage as FeedMessageDto},
10 stream::TychoStreamBuilder,
11};
12
13#[derive(Parser, Debug, Clone, PartialEq)]
18#[clap(version = env!("CARGO_PKG_VERSION"))]
19struct CliArgs {
20 #[clap(long, default_value = "localhost:4242", env = "TYCHO_URL")]
22 tycho_url: String,
23
24 #[clap(short = 'k', long, env = "TYCHO_AUTH_TOKEN")]
27 auth_key: Option<String>,
28
29 #[clap(long)]
31 no_tls: bool,
32
33 #[clap(short = 'c', long, default_value = "ethereum")]
35 pub chain: String,
36
37 #[clap(short = 'e', long, number_of_values = 1)]
40 exchange: Vec<String>,
41
42 #[clap(long)]
46 min_tvl: Option<f64>,
47
48 #[clap(long)]
51 remove_tvl_threshold: Option<f64>,
52
53 #[clap(long)]
56 add_tvl_threshold: Option<f64>,
57
58 #[clap(long)]
65 block_time: Option<u64>,
66
67 #[clap(long)]
71 timeout: Option<u64>,
72
73 #[clap(long, default_value = "logs")]
75 log_folder: String,
76
77 #[clap(long)]
79 example: bool,
80
81 #[clap(long)]
84 no_state: bool,
85
86 #[clap(short='n', long, default_value=None)]
90 max_messages: Option<usize>,
91
92 #[clap(long)]
96 max_missed_blocks: Option<u64>,
97
98 #[clap(long)]
102 include_tvl: bool,
103
104 #[clap(long)]
107 disable_compression: bool,
108
109 #[clap(long)]
113 partial_blocks: bool,
114
115 #[clap(long)]
118 verbose: bool,
119
120 #[clap(long, default_value = "32")]
122 max_retries: u64,
123
124 #[clap(long)]
126 blocklist_config: Option<std::path::PathBuf>,
127}
128
129impl CliArgs {
130 fn validate(&self) -> Result<(), String> {
131 match (self.remove_tvl_threshold, self.add_tvl_threshold) {
133 (Some(remove), Some(add)) if remove >= add => {
134 return Err("remove_tvl_threshold must be less than add_tvl_threshold".to_string());
135 }
136 (Some(_), None) | (None, Some(_)) => {
137 return Err(
138 "Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string()
139 );
140 }
141 _ => {}
142 }
143
144 Ok(())
145 }
146}
147
148#[derive(serde::Deserialize)]
149struct BlocklistFile {
150 ids: Vec<String>,
151}
152
153fn load_blocklist(path: &Path) -> Result<Vec<String>, String> {
154 let content = std::fs::read_to_string(path)
155 .map_err(|e| format!("Failed to read blocklist file {}: {e}", path.display()))?;
156 let file: BlocklistFile = toml::from_str(&content)
157 .map_err(|e| format!("Failed to parse blocklist file {}: {e}", path.display()))?;
158 Ok(file.ids)
159}
160
161pub async fn run_cli() -> Result<(), String> {
162 let args: CliArgs = CliArgs::parse();
164 args.validate()?;
165
166 let log_level = if args.verbose { "debug" } else { "info" };
168 let (non_blocking, _guard) =
169 tracing_appender::non_blocking(rolling::never(&args.log_folder, "dev_logs.log"));
170 let subscriber = tracing_subscriber::fmt()
171 .with_env_filter(
172 tracing_subscriber::EnvFilter::try_from_default_env()
173 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level)),
174 )
175 .with_writer(non_blocking)
176 .finish();
177
178 tracing::subscriber::set_global_default(subscriber)
179 .map_err(|e| format!("Failed to set up logging subscriber: {e}"))?;
180
181 let exchanges: Vec<(String, Option<String>)> = if args.example {
185 vec![
191 (
192 "uniswap_v3".to_string(),
193 Some("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640".to_string()),
194 ),
195 (
196 "uniswap_v2".to_string(),
197 Some("0xa478c2975ab1ea89e8196811f51a7b7ade33eb11".to_string()),
198 ),
199 ]
200 } else {
201 args.exchange
202 .iter()
203 .filter_map(|e| {
204 if e.contains('-') {
205 let parts: Vec<&str> = e.split('-').collect();
206 if parts.len() == 2 {
207 Some((parts[0].to_string(), Some(parts[1].to_string())))
208 } else {
209 tracing::warn!("Ignoring invalid exchange format: {}", e);
210 None
211 }
212 } else {
213 Some((e.to_string(), None))
214 }
215 })
216 .collect()
217 };
218
219 info!("Running with exchanges: {:?}", exchanges);
220
221 run(exchanges, args).await?;
222 Ok(())
223}
224
225async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) -> Result<(), String> {
226 let blocklist = match &args.blocklist_config {
227 Some(path) => load_blocklist(path)?,
228 None => Vec::new(),
229 };
230
231 let chain = Chain::from_str(&args.chain)
232 .map_err(|_| format!("Unknown chain: {chain}", chain = args.chain))?;
233
234 let mut builder = TychoStreamBuilder::new(&args.tycho_url, chain)
235 .auth_key(args.auth_key)
236 .no_tls(args.no_tls)
237 .no_state(args.no_state)
238 .include_tvl(args.include_tvl)
239 .max_retries(args.max_retries)
240 .blocklisted_ids(blocklist);
241
242 if let Some(bt) = args.block_time {
243 builder = builder.block_time(bt);
244 }
245 if let Some(to) = args.timeout {
246 builder = builder.timeout(to);
247 }
248 if let Some(mmb) = args.max_missed_blocks {
249 builder = builder.max_missed_blocks(mmb);
250 }
251 if args.disable_compression {
252 builder = builder.disable_compression();
253 }
254 if args.partial_blocks {
255 builder = builder.enable_partial_blocks();
256 }
257 if let Some(n) = args.max_messages {
258 builder = builder.max_messages(n);
259 }
260 let builder = exchanges
262 .into_iter()
263 .fold(builder, |b, (name, address)| {
264 let filter = if let Some(addr) = address {
265 ComponentFilter::Ids(vec![addr])
266 } else if let (Some(remove_tvl), Some(add_tvl)) =
267 (args.remove_tvl_threshold, args.add_tvl_threshold)
268 {
269 ComponentFilter::with_tvl_range(remove_tvl, add_tvl)
270 } else {
271 let default_min_tvl = chain.default_tvl_threshold(TvlThresholdTier::Low);
272 let min_tvl = args.min_tvl.unwrap_or(default_min_tvl);
273 ComponentFilter::with_tvl_range(min_tvl, min_tvl)
274 };
275 b.exchange(&name, filter)
276 });
277
278 let (handle, mut rx) = builder
279 .build()
280 .await
281 .map_err(|e| e.to_string())?;
282
283 let msg_printer = tokio::spawn(async move {
284 while let Some(result) = rx.recv().await {
285 let msg =
286 result.map_err(|e| format!("Message printer received synchronizer error: {e}"))?;
287
288 let json = serde_json::to_string(&FeedMessageDto::from(msg))
289 .map_err(|e| format!("Message printer failed to serialize message: {e}"))?;
290 println!("{json}");
291 }
292
293 Ok::<(), String>(())
294 });
295
296 let (failed_task, shutdown_reason) = tokio::select! {
298 res = handle => (
299 "Stream",
300 res.err().map(|e| e.to_string())
301 ),
302 res = msg_printer => (
303 "MessagePrinter",
304 extract_nested_error(res)
305 ),
306 };
307
308 Err(format!(
309 "{failed_task} task terminated: {}",
310 shutdown_reason.unwrap_or("unknown reason".to_string())
311 ))
312}
313
314#[inline]
315fn extract_nested_error<T, E1: ToString, E2: ToString>(
316 res: Result<Result<T, E1>, E2>,
317) -> Option<String> {
318 res.map_err(|e| e.to_string())
319 .and_then(|r| r.map_err(|e| e.to_string()))
320 .err()
321}
322
323#[cfg(test)]
324mod cli_tests {
325 use clap::Parser;
326
327 use super::CliArgs;
328
329 #[tokio::test]
330 async fn test_cli_args() {
331 let args = CliArgs::parse_from([
332 "tycho-client",
333 "--tycho-url",
334 "localhost:5000",
335 "--exchange",
336 "uniswap_v2",
337 "--min-tvl",
338 "3000",
339 "--block-time",
340 "50",
341 "--timeout",
342 "5",
343 "--log-folder",
344 "test_logs",
345 "--example",
346 "--max-messages",
347 "1",
348 "--blocklist-config",
349 "blocklist.toml",
350 ]);
351 let exchanges: Vec<String> = vec!["uniswap_v2".to_string()];
352 assert_eq!(args.tycho_url, "localhost:5000");
353 assert_eq!(args.exchange, exchanges);
354 assert_eq!(args.min_tvl, Some(3000.0));
355 assert_eq!(args.block_time, Some(50));
356 assert_eq!(args.timeout, Some(5));
357 assert_eq!(args.log_folder, "test_logs");
358 assert_eq!(args.max_messages, Some(1));
359 assert!(args.example);
360 assert_eq!(args.disable_compression, false);
361 assert_eq!(args.partial_blocks, false);
362 assert_eq!(args.blocklist_config, Some(std::path::PathBuf::from("blocklist.toml")));
363 }
364}