1use std::{collections::HashSet, str::FromStr, time::Duration};
2
3use clap::Parser;
4use tracing::{debug, error, info, warn};
5use tracing_appender::rolling;
6use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
7
8use crate::{
9 deltas::DeltasClient,
10 feed::{
11 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
12 BlockSynchronizer,
13 },
14 rpc::RPCClient,
15 HttpRPCClient, WsDeltasClient,
16};
17
18#[derive(Parser, Debug, Clone, PartialEq)]
23#[clap(version = env!("CARGO_PKG_VERSION"))]
24struct CliArgs {
25 #[clap(long, default_value = "localhost:4242", env = "TYCHO_URL")]
27 tycho_url: String,
28
29 #[clap(short = 'k', long, env = "TYCHO_AUTH_TOKEN")]
32 auth_key: Option<String>,
33
34 #[clap(long)]
36 no_tls: bool,
37
38 #[clap(short = 'c', long, default_value = "ethereum")]
40 pub chain: String,
41
42 #[clap(short = 'e', long, number_of_values = 1)]
45 exchange: Vec<String>,
46
47 #[clap(long, default_value = "10")]
50 min_tvl: u32,
51
52 #[clap(long)]
55 remove_tvl_threshold: Option<u32>,
56
57 #[clap(long)]
60 add_tvl_threshold: Option<u32>,
61
62 #[clap(long, default_value = "600")]
69 block_time: u64,
70
71 #[clap(long, default_value = "1")]
74 timeout: u64,
75
76 #[clap(long, default_value = "logs")]
78 log_folder: String,
79
80 #[clap(long)]
82 example: bool,
83
84 #[clap(long)]
87 no_state: bool,
88
89 #[clap(short='n', long, default_value=None)]
93 max_messages: Option<usize>,
94
95 #[clap(long, default_value = "10")]
98 max_missed_blocks: u64,
99
100 #[clap(long)]
104 include_tvl: bool,
105
106 #[clap(long)]
109 verbose: bool,
110}
111
112impl CliArgs {
113 fn validate(&self) -> Result<(), String> {
114 if self.remove_tvl_threshold.is_some() != self.add_tvl_threshold.is_some() {
116 return Err("Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string());
117 } else if self.remove_tvl_threshold.is_some() &&
118 self.add_tvl_threshold.is_some() &&
119 self.remove_tvl_threshold.unwrap() >= self.add_tvl_threshold.unwrap()
120 {
121 return Err("remove_tvl_threshold must be less than add_tvl_threshold".to_string());
122 }
123
124 Ok(())
125 }
126}
127
128pub async fn run_cli() -> Result<(), String> {
129 let args: CliArgs = CliArgs::parse();
131 args.validate()?;
132
133 let log_level = if args.verbose { "debug" } else { "info" };
135 let (non_blocking, _guard) =
136 tracing_appender::non_blocking(rolling::never(&args.log_folder, "dev_logs.log"));
137 let subscriber = tracing_subscriber::fmt()
138 .with_env_filter(
139 tracing_subscriber::EnvFilter::try_from_default_env()
140 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level)),
141 )
142 .with_writer(non_blocking)
143 .finish();
144
145 tracing::subscriber::set_global_default(subscriber)
146 .map_err(|e| format!("Failed to set up logging subscriber: {e}"))?;
147
148 if args.example {
150 let exchanges = vec![
158 (
159 "uniswap_v3".to_string(),
160 Some("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640".to_string()),
161 ),
162 (
163 "uniswap_v2".to_string(),
164 Some("0xa478c2975ab1ea89e8196811f51a7b7ade33eb11".to_string()),
165 ),
166 ];
167 run(exchanges, args).await?;
168 return Ok(());
169 }
170
171 let exchanges: Vec<(String, Option<String>)> = args
173 .exchange
174 .iter()
175 .filter_map(|e| {
176 if e.contains('-') {
177 let parts: Vec<&str> = e.split('-').collect();
178 if parts.len() == 2 {
179 Some((parts[0].to_string(), Some(parts[1].to_string())))
180 } else {
181 warn!("Ignoring invalid exchange format: {}", e);
182 None
183 }
184 } else {
185 Some((e.to_string(), None))
186 }
187 })
188 .collect();
189
190 info!("Running with exchanges: {:?}", exchanges);
191
192 run(exchanges, args).await?;
193 Ok(())
194}
195
196async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) -> Result<(), String> {
197 info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
198 let (tycho_ws_url, tycho_rpc_url) = if args.no_tls || args.auth_key.is_none() {
200 info!("Using non-secure connection: ws:// and http://");
201 let tycho_ws_url = format!("ws://{url}", url = &args.tycho_url);
202 let tycho_rpc_url = format!("http://{url}", url = &args.tycho_url);
203 (tycho_ws_url, tycho_rpc_url)
204 } else {
205 info!("Using secure connection: wss:// and https://");
206 let tycho_ws_url = format!("wss://{url}", url = &args.tycho_url);
207 let tycho_rpc_url = format!("https://{url}", url = &args.tycho_url);
208 (tycho_ws_url, tycho_rpc_url)
209 };
210
211 let ws_client = WsDeltasClient::new(&tycho_ws_url, args.auth_key.as_deref())
212 .map_err(|e| format!("Failed to create WebSocket client: {e}"))?;
213 let rpc_client = HttpRPCClient::new(&tycho_rpc_url, args.auth_key.as_deref())
214 .map_err(|e| format!("Failed to create RPC client: {e}"))?;
215 let chain = Chain::from_str(&args.chain)
216 .map_err(|_| format!("Unknown chain: {chain}", chain = &args.chain))?;
217 let ws_jh = ws_client
218 .connect()
219 .await
220 .map_err(|e| format!("WebSocket client connection error: {e}"))?;
221
222 let mut block_sync = BlockSynchronizer::new(
223 Duration::from_secs(args.block_time),
224 Duration::from_secs(args.timeout),
225 args.max_missed_blocks,
226 );
227
228 if let Some(mm) = &args.max_messages {
229 block_sync.max_messages(*mm);
230 }
231
232 let available_protocols_set = rpc_client
233 .get_protocol_systems(&ProtocolSystemsRequestBody {
234 chain,
235 pagination: PaginationParams { page: 0, page_size: 100 },
236 })
237 .await
238 .map_err(|e| format!("Failed to get protocol systems: {e}"))?
239 .protocol_systems
240 .into_iter()
241 .collect::<HashSet<_>>();
242
243 let requested_protocol_set = exchanges
244 .iter()
245 .map(|(name, _)| name.clone())
246 .collect::<HashSet<_>>();
247
248 let not_requested_protocols = available_protocols_set
249 .difference(&requested_protocol_set)
250 .cloned()
251 .collect::<Vec<_>>();
252
253 if !not_requested_protocols.is_empty() {
254 info!("Other available protocols: {}", not_requested_protocols.join(", "));
255 }
256
257 for (name, address) in exchanges {
258 debug!("Registering exchange: {}", name);
259 let id = ExtractorIdentity { chain, name: name.clone() };
260 let filter = if let Some(address) = address {
261 ComponentFilter::Ids(vec![address])
262 } else if let (Some(remove_tvl), Some(add_tvl)) =
263 (args.remove_tvl_threshold, args.add_tvl_threshold)
264 {
265 ComponentFilter::with_tvl_range(remove_tvl as f64, add_tvl as f64)
266 } else {
267 ComponentFilter::with_tvl_range(args.min_tvl as f64, args.min_tvl as f64)
268 };
269 let sync = ProtocolStateSynchronizer::new(
270 id.clone(),
271 true,
272 filter,
273 32,
274 Duration::from_secs(args.block_time / 2),
275 !args.no_state,
276 args.include_tvl,
277 rpc_client.clone(),
278 ws_client.clone(),
279 args.block_time + args.timeout,
280 );
281 block_sync = block_sync.register_synchronizer(id, sync);
282 }
283
284 let (sync_jh, mut rx) = block_sync
285 .run()
286 .await
287 .map_err(|e| format!("Failed to start block synchronizer: {e}"))?;
288
289 let msg_printer = tokio::spawn(async move {
290 while let Some(Ok(msg)) = rx.recv().await {
291 if let Ok(msg_json) = serde_json::to_string(&msg) {
292 println!("{msg_json}");
293 } else {
294 error!("Failed to serialize FeedMessage");
295 }
296 }
297 });
298
299 tokio::select! {
301 res = ws_jh => {
302 if let Err(e) = res {
303 error!("WebSocket connection dropped unexpectedly: {}", e);
304 }
305 }
306 res = sync_jh => {
307 if let Err(e) = res {
308 error!("BlockSynchronizer stopped unexpectedly: {}", e);
309 }
310 }
311 res = msg_printer => {
312 if let Err(e) = res {
313 error!("Message printer stopped unexpectedly: {}", e);
314 }
315 }
316 }
317
318 debug!("RX closed");
319 Ok(())
320}
321
322#[cfg(test)]
323mod cli_tests {
324 use clap::Parser;
325
326 use super::CliArgs;
327
328 #[tokio::test]
329 async fn test_cli_args() {
330 let args = CliArgs::parse_from([
331 "tycho-client",
332 "--tycho-url",
333 "localhost:5000",
334 "--exchange",
335 "uniswap_v2",
336 "--min-tvl",
337 "3000",
338 "--block-time",
339 "50",
340 "--timeout",
341 "5",
342 "--log-folder",
343 "test_logs",
344 "--example",
345 "--max-messages",
346 "1",
347 ]);
348 let exchanges: Vec<String> = vec!["uniswap_v2".to_string()];
349 assert_eq!(args.tycho_url, "localhost:5000");
350 assert_eq!(args.exchange, exchanges);
351 assert_eq!(args.min_tvl, 3000);
352 assert_eq!(args.block_time, 50);
353 assert_eq!(args.timeout, 5);
354 assert_eq!(args.log_folder, "test_logs");
355 assert_eq!(args.max_messages, Some(1));
356 assert!(args.example);
357 }
358}