tycho_client/
cli.rs

1use std::{collections::HashSet, str::FromStr, time::Duration};
2
3use clap::Parser;
4use tracing::{debug, info};
5use tracing_appender::rolling;
6use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
7
8use crate::{
9    deltas::DeltasClient,
10    feed::{
11        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
12        BlockSynchronizer,
13    },
14    rpc::RPCClient,
15    HttpRPCClient, WsDeltasClient,
16};
17
18#[derive(Parser, Debug, Clone, PartialEq)]
19#[clap(version = env!("CARGO_PKG_VERSION"))]
20struct CliArgs {
21    /// Tycho server URL, without protocol. Example: localhost:4242
22    #[clap(long, default_value = "localhost:4242", env = "TYCHO_URL")]
23    tycho_url: String,
24
25    /// Tycho gateway API key, used as authentication for both websocket and http connections.
26    /// Can be set with TYCHO_AUTH_TOKEN env variable.
27    #[clap(short = 'k', long, env = "TYCHO_AUTH_TOKEN")]
28    auth_key: Option<String>,
29
30    /// If set, use unsecured transports: http and ws instead of https and wss.
31    #[clap(long)]
32    no_tls: bool,
33
34    /// The blockchain to index on
35    #[clap(long, default_value = "ethereum")]
36    pub chain: String,
37
38    /// Specifies exchanges and optionally a pool address in the format name:address
39    #[clap(long, number_of_values = 1)]
40    exchange: Vec<String>,
41
42    /// Specifies the minimum TVL to filter the components. Denoted in the native token (e.g.
43    /// Mainnet -> ETH). Ignored if addresses or range tvl values are provided.
44    #[clap(long, default_value = "10")]
45    min_tvl: u32,
46
47    /// Specifies the lower bound of the TVL threshold range. Denoted in the native token (e.g.
48    /// Mainnet -> ETH). Components below this TVL will be removed from tracking.
49    #[clap(long)]
50    remove_tvl_threshold: Option<u32>,
51
52    /// Specifies the upper bound of the TVL threshold range. Denoted in the native token (e.g.
53    /// Mainnet -> ETH). Components above this TVL will be added to tracking.
54    #[clap(long)]
55    add_tvl_threshold: Option<u32>,
56
57    /// Expected block time in seconds. For blockchains with consistent intervals,
58    /// set to the average block time (e.g., "600" for a 10-minute interval).
59    ///
60    /// Adjusting `block_time` helps balance efficiency and responsiveness:
61    /// - **Low values**: Increase sync frequency but may waste resources on retries.
62    /// - **High values**: Reduce sync frequency but may delay updates on faster chains.
63    #[clap(long, default_value = "600")]
64    block_time: u64,
65
66    /// Maximum wait time in seconds beyond the block time. Useful for handling
67    /// chains with variable block intervals or network delays.
68    #[clap(long, default_value = "1")]
69    timeout: u64,
70
71    /// Logging folder path.
72    #[clap(long, default_value = "logs")]
73    log_folder: String,
74
75    /// Run the example on a single block with UniswapV2 and UniswapV3.
76    #[clap(long)]
77    example: bool,
78
79    /// If set, only component and tokens are streamed, any snapshots or state updates
80    /// are omitted from the stream.
81    #[clap(long)]
82    no_state: bool,
83
84    /// Maximum amount of messages to process before exiting. Useful for debugging e.g.
85    /// to easily get a state sync messages for a fixture. Alternatively this may be
86    /// used to trigger a regular restart or resync.
87    #[clap(short='n', long, default_value=None)]
88    max_messages: Option<usize>,
89
90    /// Maximum blocks an exchange can be absent for before it is marked as stale. Used
91    /// in conjunction with block_time to calculate a timeout: block_time * max_missed_blocks.
92    #[clap(long, default_value = "10")]
93    max_missed_blocks: u64,
94
95    /// If set, the synchronizer will include TVL in the messages.
96    /// Enabling this option will increase the number of network requests made during start-up,
97    //  which may result in increased start-up latency.
98    #[clap(long)]
99    include_tvl: bool,
100}
101
102impl CliArgs {
103    fn validate(&self) -> Result<(), String> {
104        if self.remove_tvl_threshold.is_some() && self.add_tvl_threshold.is_none() {
105            return Err("Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string());
106        }
107        if self.remove_tvl_threshold.is_none() && self.add_tvl_threshold.is_some() {
108            return Err("Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string());
109        }
110        Ok(())
111    }
112}
113
114pub async fn run_cli() {
115    // Parse CLI Args
116    let args: CliArgs = CliArgs::parse();
117    if let Err(e) = args.validate() {
118        panic!("{}", e);
119    }
120
121    // Setup Logging
122    let (non_blocking, _guard) =
123        tracing_appender::non_blocking(rolling::never(&args.log_folder, "dev_logs.log"));
124    let subscriber = tracing_subscriber::fmt()
125        .with_env_filter(
126            tracing_subscriber::EnvFilter::try_from_default_env()
127                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
128        )
129        .with_writer(non_blocking)
130        .finish();
131
132    tracing::subscriber::set_global_default(subscriber)
133        .expect("Failed to set up logging subscriber");
134
135    // Runs example if flag is set.
136    if args.example {
137        // Run a simple example of a block synchronizer.
138        //
139        // You need to port-forward tycho before running this:
140        //
141        // ```bash
142        // kubectl port-forward -n dev-tycho deploy/tycho-indexer 8888:4242
143        // ```
144        let exchanges = vec![
145            (
146                "uniswap_v3".to_string(),
147                Some("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640".to_string()),
148            ),
149            (
150                "uniswap_v2".to_string(),
151                Some("0xa478c2975ab1ea89e8196811f51a7b7ade33eb11".to_string()),
152            ),
153        ];
154        run(exchanges, args).await;
155        return;
156    }
157
158    // Parse exchange name and addresses from name:address format.
159    let exchanges: Vec<(String, Option<String>)> = args
160        .exchange
161        .iter()
162        .filter_map(|e| {
163            if e.contains('-') {
164                let parts: Vec<&str> = e.split('-').collect();
165                if parts.len() == 2 {
166                    Some((parts[0].to_string(), Some(parts[1].to_string())))
167                } else {
168                    tracing::warn!("Ignoring invalid exchange format: {}", e);
169                    None
170                }
171            } else {
172                Some((e.to_string(), None))
173            }
174        })
175        .collect();
176
177    tracing::info!("Running with exchanges: {:?}", exchanges);
178
179    run(exchanges, args).await;
180}
181
182async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) {
183    //TODO: remove "or args.auth_key.is_none()" when our internal client use the no_tls flag
184    let (tycho_ws_url, tycho_rpc_url) = if args.no_tls || args.auth_key.is_none() {
185        info!("Using non-secure connection: ws:// and http://");
186        let tycho_ws_url = format!("ws://{}", &args.tycho_url);
187        let tycho_rpc_url = format!("http://{}", &args.tycho_url);
188        (tycho_ws_url, tycho_rpc_url)
189    } else {
190        info!("Using secure connection: wss:// and https://");
191        let tycho_ws_url = format!("wss://{}", &args.tycho_url);
192        let tycho_rpc_url = format!("https://{}", &args.tycho_url);
193        (tycho_ws_url, tycho_rpc_url)
194    };
195
196    let ws_client = WsDeltasClient::new(&tycho_ws_url, args.auth_key.as_deref()).unwrap();
197    let rpc_client = HttpRPCClient::new(&tycho_rpc_url, args.auth_key.as_deref()).unwrap();
198    let chain =
199        Chain::from_str(&args.chain).unwrap_or_else(|_| panic!("Unknown chain {}", &args.chain));
200    let ws_jh = ws_client
201        .connect()
202        .await
203        .expect("ws client connection error");
204
205    let mut block_sync = BlockSynchronizer::new(
206        Duration::from_secs(args.block_time),
207        Duration::from_secs(args.timeout),
208        args.max_missed_blocks,
209    );
210
211    if let Some(mm) = &args.max_messages {
212        block_sync.max_messages(*mm);
213    }
214
215    let available_protocols_set = rpc_client
216        .get_protocol_systems(&ProtocolSystemsRequestBody {
217            chain,
218            pagination: PaginationParams { page: 0, page_size: 100 },
219        })
220        .await
221        .unwrap()
222        .protocol_systems
223        .into_iter()
224        .collect::<HashSet<_>>();
225
226    let requested_protocol_set = exchanges
227        .iter()
228        .map(|(name, _)| name.clone())
229        .collect::<HashSet<_>>();
230
231    let not_requested_protocols = available_protocols_set
232        .difference(&requested_protocol_set)
233        .cloned()
234        .collect::<Vec<_>>();
235
236    if !not_requested_protocols.is_empty() {
237        tracing::info!("Other available protocols: {}", not_requested_protocols.join(", "));
238    }
239
240    for (name, address) in exchanges {
241        debug!("Registering exchange: {}", name);
242        let id = ExtractorIdentity { chain, name: name.clone() };
243        let filter = if address.is_some() {
244            ComponentFilter::Ids(vec![address.unwrap()])
245        } else if let (Some(remove_tvl), Some(add_tvl)) =
246            (args.remove_tvl_threshold, args.add_tvl_threshold)
247        {
248            ComponentFilter::with_tvl_range(remove_tvl as f64, add_tvl as f64)
249        } else {
250            ComponentFilter::with_tvl_range(args.min_tvl as f64, args.min_tvl as f64)
251        };
252        let sync = ProtocolStateSynchronizer::new(
253            id.clone(),
254            true,
255            filter,
256            3,
257            !args.no_state,
258            args.include_tvl,
259            rpc_client.clone(),
260            ws_client.clone(),
261            args.block_time + args.timeout,
262        );
263        block_sync = block_sync.register_synchronizer(id, sync);
264    }
265
266    let (sync_jh, mut rx) = block_sync
267        .run()
268        .await
269        .expect("block sync start error");
270
271    let msg_printer = tokio::spawn(async move {
272        while let Some(msg) = rx.recv().await {
273            if let Ok(msg_json) = serde_json::to_string(&msg) {
274                println!("{msg_json}");
275            } else {
276                tracing::error!("Failed to serialize FeedMessage");
277            }
278        }
279    });
280
281    // Monitor the WebSocket, BlockSynchronizer and message printer futures.
282    tokio::select! {
283        res = ws_jh => {
284            let _ = res.expect("WebSocket connection dropped unexpectedly");
285        }
286        res = sync_jh => {
287            res.expect("BlockSynchronizer stopped unexpectedly");
288        }
289        res = msg_printer => {
290            res.expect("Message printer stopped unexpectedly");
291        }
292    }
293
294    tracing::debug!("RX closed");
295}
296
297#[cfg(test)]
298mod cli_tests {
299    use clap::Parser;
300
301    use super::CliArgs;
302
303    #[tokio::test]
304    async fn test_cli_args() {
305        let args = CliArgs::parse_from([
306            "tycho-client",
307            "--tycho-url",
308            "localhost:5000",
309            "--exchange",
310            "uniswap_v2",
311            "--min-tvl",
312            "3000",
313            "--block-time",
314            "50",
315            "--timeout",
316            "5",
317            "--log-folder",
318            "test_logs",
319            "--example",
320            "--max-messages",
321            "1",
322        ]);
323        let exchanges: Vec<String> = vec!["uniswap_v2".to_string()];
324        assert_eq!(args.tycho_url, "localhost:5000");
325        assert_eq!(args.exchange, exchanges);
326        assert_eq!(args.min_tvl, 3000);
327        assert_eq!(args.block_time, 50);
328        assert_eq!(args.timeout, 5);
329        assert_eq!(args.log_folder, "test_logs");
330        assert_eq!(args.max_messages, Some(1));
331        assert!(args.example);
332    }
333}