1use std::{collections::HashSet, str::FromStr, time::Duration};
2
3use clap::Parser;
4use tracing::{debug, info};
5use tracing_appender::rolling;
6use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
7
8use crate::{
9 deltas::DeltasClient,
10 feed::{
11 component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
12 BlockSynchronizer,
13 },
14 rpc::RPCClient,
15 HttpRPCClient, WsDeltasClient,
16};
17
18#[derive(Parser, Debug, Clone, PartialEq)]
19#[clap(version = env!("CARGO_PKG_VERSION"))]
20struct CliArgs {
21 #[clap(long, default_value = "localhost:4242", env = "TYCHO_URL")]
23 tycho_url: String,
24
25 #[clap(short = 'k', long, env = "TYCHO_AUTH_TOKEN")]
28 auth_key: Option<String>,
29
30 #[clap(long)]
32 no_tls: bool,
33
34 #[clap(long, default_value = "ethereum")]
36 pub chain: String,
37
38 #[clap(long, number_of_values = 1)]
40 exchange: Vec<String>,
41
42 #[clap(long, default_value = "10")]
45 min_tvl: u32,
46
47 #[clap(long)]
50 remove_tvl_threshold: Option<u32>,
51
52 #[clap(long)]
55 add_tvl_threshold: Option<u32>,
56
57 #[clap(long, default_value = "600")]
64 block_time: u64,
65
66 #[clap(long, default_value = "1")]
69 timeout: u64,
70
71 #[clap(long, default_value = "logs")]
73 log_folder: String,
74
75 #[clap(long)]
77 example: bool,
78
79 #[clap(long)]
82 no_state: bool,
83
84 #[clap(short='n', long, default_value=None)]
88 max_messages: Option<usize>,
89
90 #[clap(long, default_value = "10")]
93 max_missed_blocks: u64,
94}
95
96impl CliArgs {
97 fn validate(&self) -> Result<(), String> {
98 if self.remove_tvl_threshold.is_some() && self.add_tvl_threshold.is_none() {
99 return Err("Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string());
100 }
101 if self.remove_tvl_threshold.is_none() && self.add_tvl_threshold.is_some() {
102 return Err("Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string());
103 }
104 Ok(())
105 }
106}
107
108pub async fn run_cli() {
109 let args: CliArgs = CliArgs::parse();
111 if let Err(e) = args.validate() {
112 panic!("{}", e);
113 }
114
115 let (non_blocking, _guard) =
117 tracing_appender::non_blocking(rolling::never(&args.log_folder, "dev_logs.log"));
118 let subscriber = tracing_subscriber::fmt()
119 .with_env_filter(
120 tracing_subscriber::EnvFilter::try_from_default_env()
121 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
122 )
123 .with_writer(non_blocking)
124 .finish();
125
126 tracing::subscriber::set_global_default(subscriber)
127 .expect("Failed to set up logging subscriber");
128
129 if args.example {
131 let exchanges = vec![
139 (
140 "uniswap_v3".to_string(),
141 Some("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640".to_string()),
142 ),
143 (
144 "uniswap_v2".to_string(),
145 Some("0xa478c2975ab1ea89e8196811f51a7b7ade33eb11".to_string()),
146 ),
147 ];
148 run(exchanges, args).await;
149 return;
150 }
151
152 let exchanges: Vec<(String, Option<String>)> = args
154 .exchange
155 .iter()
156 .filter_map(|e| {
157 if e.contains('-') {
158 let parts: Vec<&str> = e.split('-').collect();
159 if parts.len() == 2 {
160 Some((parts[0].to_string(), Some(parts[1].to_string())))
161 } else {
162 tracing::warn!("Ignoring invalid exchange format: {}", e);
163 None
164 }
165 } else {
166 Some((e.to_string(), None))
167 }
168 })
169 .collect();
170
171 tracing::info!("Running with exchanges: {:?}", exchanges);
172
173 run(exchanges, args).await;
174}
175
176async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) {
177 let (tycho_ws_url, tycho_rpc_url) = if args.no_tls || args.auth_key.is_none() {
179 info!("Using non-secure connection: ws:// and http://");
180 let tycho_ws_url = format!("ws://{}", &args.tycho_url);
181 let tycho_rpc_url = format!("http://{}", &args.tycho_url);
182 (tycho_ws_url, tycho_rpc_url)
183 } else {
184 info!("Using secure connection: wss:// and https://");
185 let tycho_ws_url = format!("wss://{}", &args.tycho_url);
186 let tycho_rpc_url = format!("https://{}", &args.tycho_url);
187 (tycho_ws_url, tycho_rpc_url)
188 };
189
190 let ws_client = WsDeltasClient::new(&tycho_ws_url, args.auth_key.as_deref()).unwrap();
191 let rpc_client = HttpRPCClient::new(&tycho_rpc_url, args.auth_key.as_deref()).unwrap();
192 let chain =
193 Chain::from_str(&args.chain).unwrap_or_else(|_| panic!("Unknown chain {}", &args.chain));
194 let ws_jh = ws_client
195 .connect()
196 .await
197 .expect("ws client connection error");
198
199 let mut block_sync = BlockSynchronizer::new(
200 Duration::from_secs(args.block_time),
201 Duration::from_secs(args.timeout),
202 args.max_missed_blocks,
203 );
204
205 if let Some(mm) = &args.max_messages {
206 block_sync.max_messages(*mm);
207 }
208
209 let available_protocols_set = rpc_client
210 .get_protocol_systems(&ProtocolSystemsRequestBody {
211 chain,
212 pagination: PaginationParams { page: 0, page_size: 100 },
213 })
214 .await
215 .unwrap()
216 .protocol_systems
217 .into_iter()
218 .collect::<HashSet<_>>();
219
220 let requested_protocol_set = exchanges
221 .iter()
222 .map(|(name, _)| name.clone())
223 .collect::<HashSet<_>>();
224
225 let not_requested_protocols = available_protocols_set
226 .difference(&requested_protocol_set)
227 .cloned()
228 .collect::<Vec<_>>();
229
230 if !not_requested_protocols.is_empty() {
231 tracing::info!("Other available protocols: {}", not_requested_protocols.join(", "));
232 }
233
234 for (name, address) in exchanges {
235 debug!("Registering exchange: {}", name);
236 let id = ExtractorIdentity { chain, name: name.clone() };
237 let filter = if address.is_some() {
238 ComponentFilter::Ids(vec![address.unwrap()])
239 } else if let (Some(remove_tvl), Some(add_tvl)) =
240 (args.remove_tvl_threshold, args.add_tvl_threshold)
241 {
242 ComponentFilter::with_tvl_range(remove_tvl as f64, add_tvl as f64)
243 } else {
244 ComponentFilter::with_tvl_range(args.min_tvl as f64, args.min_tvl as f64)
245 };
246 let sync = ProtocolStateSynchronizer::new(
247 id.clone(),
248 true,
249 filter,
250 3,
251 !args.no_state,
252 rpc_client.clone(),
253 ws_client.clone(),
254 args.block_time + args.timeout,
255 );
256 block_sync = block_sync.register_synchronizer(id, sync);
257 }
258
259 let (sync_jh, mut rx) = block_sync
260 .run()
261 .await
262 .expect("block sync start error");
263
264 let msg_printer = tokio::spawn(async move {
265 while let Some(msg) = rx.recv().await {
266 if let Ok(msg_json) = serde_json::to_string(&msg) {
267 println!("{}", msg_json);
268 } else {
269 tracing::error!("Failed to serialize FeedMessage");
270 }
271 }
272 });
273
274 tokio::select! {
276 res = ws_jh => {
277 let _ = res.expect("WebSocket connection dropped unexpectedly");
278 }
279 res = sync_jh => {
280 res.expect("BlockSynchronizer stopped unexpectedly");
281 }
282 res = msg_printer => {
283 res.expect("Message printer stopped unexpectedly");
284 }
285 }
286
287 tracing::debug!("RX closed");
288}
289
290#[cfg(test)]
291mod cli_tests {
292 use clap::Parser;
293
294 use super::CliArgs;
295
296 #[tokio::test]
297 async fn test_cli_args() {
298 let args = CliArgs::parse_from([
299 "tycho-client",
300 "--tycho-url",
301 "localhost:5000",
302 "--exchange",
303 "uniswap_v2",
304 "--min-tvl",
305 "3000",
306 "--block-time",
307 "50",
308 "--timeout",
309 "5",
310 "--log-folder",
311 "test_logs",
312 "--example",
313 "--max-messages",
314 "1",
315 ]);
316 let exchanges: Vec<String> = vec!["uniswap_v2".to_string()];
317 assert_eq!(args.tycho_url, "localhost:5000");
318 assert_eq!(args.exchange, exchanges);
319 assert_eq!(args.min_tvl, 3000);
320 assert_eq!(args.block_time, 50);
321 assert_eq!(args.timeout, 5);
322 assert_eq!(args.log_folder, "test_logs");
323 assert_eq!(args.max_messages, Some(1));
324 assert!(args.example);
325 }
326}