1use std::{collections::HashSet, str::FromStr, time::Duration};
2
3use clap::Parser;
4use tracing::{debug, error, info, warn};
5use tracing_appender::rolling;
6use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
7
8use crate::{
9 deltas::DeltasClient,
10 feed::{
11 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
12 BlockSynchronizer,
13 },
14 rpc::RPCClient,
15 HttpRPCClient, WsDeltasClient,
16};
17
18#[derive(Parser, Debug, Clone, PartialEq)]
19#[clap(version = env!("CARGO_PKG_VERSION"))]
20struct CliArgs {
21 #[clap(long, default_value = "localhost:4242", env = "TYCHO_URL")]
23 tycho_url: String,
24
25 #[clap(short = 'k', long, env = "TYCHO_AUTH_TOKEN")]
28 auth_key: Option<String>,
29
30 #[clap(long)]
32 no_tls: bool,
33
34 #[clap(long, default_value = "ethereum")]
36 pub chain: String,
37
38 #[clap(long, number_of_values = 1)]
40 exchange: Vec<String>,
41
42 #[clap(long, default_value = "10")]
45 min_tvl: u32,
46
47 #[clap(long)]
50 remove_tvl_threshold: Option<u32>,
51
52 #[clap(long)]
55 add_tvl_threshold: Option<u32>,
56
57 #[clap(long, default_value = "600")]
64 block_time: u64,
65
66 #[clap(long, default_value = "1")]
69 timeout: u64,
70
71 #[clap(long, default_value = "logs")]
73 log_folder: String,
74
75 #[clap(long)]
77 example: bool,
78
79 #[clap(long)]
82 no_state: bool,
83
84 #[clap(short='n', long, default_value=None)]
88 max_messages: Option<usize>,
89
90 #[clap(long, default_value = "10")]
93 max_missed_blocks: u64,
94
95 #[clap(long)]
99 include_tvl: bool,
100}
101
102impl CliArgs {
103 fn validate(&self) -> Result<(), String> {
104 if self.remove_tvl_threshold.is_some() != self.add_tvl_threshold.is_some() {
106 return Err("Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string());
107 }
108 Ok(())
109 }
110}
111
112pub async fn run_cli() -> Result<(), String> {
113 let args: CliArgs = CliArgs::parse();
115 args.validate()?;
116
117 let (non_blocking, _guard) =
119 tracing_appender::non_blocking(rolling::never(&args.log_folder, "dev_logs.log"));
120 let subscriber = tracing_subscriber::fmt()
121 .with_env_filter(
122 tracing_subscriber::EnvFilter::try_from_default_env()
123 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
124 )
125 .with_writer(non_blocking)
126 .finish();
127
128 tracing::subscriber::set_global_default(subscriber)
129 .map_err(|e| format!("Failed to set up logging subscriber: {e}"))?;
130
131 if args.example {
133 let exchanges = vec![
141 (
142 "uniswap_v3".to_string(),
143 Some("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640".to_string()),
144 ),
145 (
146 "uniswap_v2".to_string(),
147 Some("0xa478c2975ab1ea89e8196811f51a7b7ade33eb11".to_string()),
148 ),
149 ];
150 run(exchanges, args).await?;
151 return Ok(());
152 }
153
154 let exchanges: Vec<(String, Option<String>)> = args
156 .exchange
157 .iter()
158 .filter_map(|e| {
159 if e.contains('-') {
160 let parts: Vec<&str> = e.split('-').collect();
161 if parts.len() == 2 {
162 Some((parts[0].to_string(), Some(parts[1].to_string())))
163 } else {
164 warn!("Ignoring invalid exchange format: {}", e);
165 None
166 }
167 } else {
168 Some((e.to_string(), None))
169 }
170 })
171 .collect();
172
173 info!("Running with exchanges: {:?}", exchanges);
174
175 run(exchanges, args).await?;
176 Ok(())
177}
178
179async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) -> Result<(), String> {
180 let (tycho_ws_url, tycho_rpc_url) = if args.no_tls || args.auth_key.is_none() {
182 info!("Using non-secure connection: ws:// and http://");
183 let tycho_ws_url = format!("ws://{url}", url = &args.tycho_url);
184 let tycho_rpc_url = format!("http://{url}", url = &args.tycho_url);
185 (tycho_ws_url, tycho_rpc_url)
186 } else {
187 info!("Using secure connection: wss:// and https://");
188 let tycho_ws_url = format!("wss://{url}", url = &args.tycho_url);
189 let tycho_rpc_url = format!("https://{url}", url = &args.tycho_url);
190 (tycho_ws_url, tycho_rpc_url)
191 };
192
193 let ws_client = WsDeltasClient::new(&tycho_ws_url, args.auth_key.as_deref())
194 .map_err(|e| format!("Failed to create WebSocket client: {e}"))?;
195 let rpc_client = HttpRPCClient::new(&tycho_rpc_url, args.auth_key.as_deref())
196 .map_err(|e| format!("Failed to create RPC client: {e}"))?;
197 let chain = Chain::from_str(&args.chain)
198 .map_err(|_| format!("Unknown chain: {chain}", chain = &args.chain))?;
199 let ws_jh = ws_client
200 .connect()
201 .await
202 .map_err(|e| format!("WebSocket client connection error: {e}"))?;
203
204 let mut block_sync = BlockSynchronizer::new(
205 Duration::from_secs(args.block_time),
206 Duration::from_secs(args.timeout),
207 args.max_missed_blocks,
208 );
209
210 if let Some(mm) = &args.max_messages {
211 block_sync.max_messages(*mm);
212 }
213
214 let available_protocols_set = rpc_client
215 .get_protocol_systems(&ProtocolSystemsRequestBody {
216 chain,
217 pagination: PaginationParams { page: 0, page_size: 100 },
218 })
219 .await
220 .map_err(|e| format!("Failed to get protocol systems: {e}"))?
221 .protocol_systems
222 .into_iter()
223 .collect::<HashSet<_>>();
224
225 let requested_protocol_set = exchanges
226 .iter()
227 .map(|(name, _)| name.clone())
228 .collect::<HashSet<_>>();
229
230 let not_requested_protocols = available_protocols_set
231 .difference(&requested_protocol_set)
232 .cloned()
233 .collect::<Vec<_>>();
234
235 if !not_requested_protocols.is_empty() {
236 info!("Other available protocols: {}", not_requested_protocols.join(", "));
237 }
238
239 for (name, address) in exchanges {
240 debug!("Registering exchange: {}", name);
241 let id = ExtractorIdentity { chain, name: name.clone() };
242 let filter = if address.is_some() {
243 ComponentFilter::Ids(vec![address.unwrap()])
244 } else if let (Some(remove_tvl), Some(add_tvl)) =
245 (args.remove_tvl_threshold, args.add_tvl_threshold)
246 {
247 ComponentFilter::with_tvl_range(remove_tvl as f64, add_tvl as f64)
248 } else {
249 ComponentFilter::with_tvl_range(args.min_tvl as f64, args.min_tvl as f64)
250 };
251 let sync = ProtocolStateSynchronizer::new(
252 id.clone(),
253 true,
254 filter,
255 3,
256 !args.no_state,
257 args.include_tvl,
258 rpc_client.clone(),
259 ws_client.clone(),
260 args.block_time + args.timeout,
261 );
262 block_sync = block_sync.register_synchronizer(id, sync);
263 }
264
265 let (sync_jh, mut rx) = block_sync
266 .run()
267 .await
268 .map_err(|e| format!("Failed to start block synchronizer: {e}"))?;
269
270 let msg_printer = tokio::spawn(async move {
271 while let Some(msg) = rx.recv().await {
272 if let Ok(msg_json) = serde_json::to_string(&msg) {
273 println!("{msg_json}");
274 } else {
275 error!("Failed to serialize FeedMessage");
276 }
277 }
278 });
279
280 tokio::select! {
282 res = ws_jh => {
283 if let Err(e) = res {
284 error!("WebSocket connection dropped unexpectedly: {}", e);
285 }
286 }
287 res = sync_jh => {
288 if let Err(e) = res {
289 error!("BlockSynchronizer stopped unexpectedly: {}", e);
290 }
291 }
292 res = msg_printer => {
293 if let Err(e) = res {
294 error!("Message printer stopped unexpectedly: {}", e);
295 }
296 }
297 }
298
299 debug!("RX closed");
300 Ok(())
301}
302
303#[cfg(test)]
304mod cli_tests {
305 use clap::Parser;
306
307 use super::CliArgs;
308
309 #[tokio::test]
310 async fn test_cli_args() {
311 let args = CliArgs::parse_from([
312 "tycho-client",
313 "--tycho-url",
314 "localhost:5000",
315 "--exchange",
316 "uniswap_v2",
317 "--min-tvl",
318 "3000",
319 "--block-time",
320 "50",
321 "--timeout",
322 "5",
323 "--log-folder",
324 "test_logs",
325 "--example",
326 "--max-messages",
327 "1",
328 ]);
329 let exchanges: Vec<String> = vec!["uniswap_v2".to_string()];
330 assert_eq!(args.tycho_url, "localhost:5000");
331 assert_eq!(args.exchange, exchanges);
332 assert_eq!(args.min_tvl, 3000);
333 assert_eq!(args.block_time, 50);
334 assert_eq!(args.timeout, 5);
335 assert_eq!(args.log_folder, "test_logs");
336 assert_eq!(args.max_messages, Some(1));
337 assert!(args.example);
338 }
339}