tycho_client/
cli.rs

1use std::{collections::HashSet, str::FromStr, time::Duration};
2
3use clap::Parser;
4use tracing::{debug, info};
5use tracing_appender::rolling;
6use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
7
8use crate::{
9    deltas::DeltasClient,
10    feed::{
11        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
12        BlockSynchronizer,
13    },
14    rpc::RPCClient,
15    HttpRPCClient, WsDeltasClient,
16};
17
18#[derive(Parser, Debug, Clone, PartialEq)]
19#[clap(version = env!("CARGO_PKG_VERSION"))]
20struct CliArgs {
21    /// Tycho server URL, without protocol. Example: localhost:4242
22    #[clap(long, default_value = "localhost:4242", env = "TYCHO_URL")]
23    tycho_url: String,
24
25    /// Tycho gateway API key, used as authentication for both websocket and http connections.
26    /// Can be set with TYCHO_AUTH_TOKEN env variable.
27    #[clap(short = 'k', long, env = "TYCHO_AUTH_TOKEN")]
28    auth_key: Option<String>,
29
30    /// If set, use unsecured transports: http and ws instead of https and wss.
31    #[clap(long)]
32    no_tls: bool,
33
34    /// The blockchain to index on
35    #[clap(long, default_value = "ethereum")]
36    pub chain: String,
37
38    /// Specifies exchanges and optionally a pool address in the format name:address
39    #[clap(long, number_of_values = 1)]
40    exchange: Vec<String>,
41
42    /// Specifies the minimum TVL to filter the components. Denoted in the native token (e.g.
43    /// Mainnet -> ETH). Ignored if addresses or range tvl values are provided.
44    #[clap(long, default_value = "10")]
45    min_tvl: u32,
46
47    /// Specifies the lower bound of the TVL threshold range. Denoted in the native token (e.g.
48    /// Mainnet -> ETH). Components below this TVL will be removed from tracking.
49    #[clap(long)]
50    remove_tvl_threshold: Option<u32>,
51
52    /// Specifies the upper bound of the TVL threshold range. Denoted in the native token (e.g.
53    /// Mainnet -> ETH). Components above this TVL will be added to tracking.
54    #[clap(long)]
55    add_tvl_threshold: Option<u32>,
56
57    /// Expected block time in seconds. For blockchains with consistent intervals,
58    /// set to the average block time (e.g., "600" for a 10-minute interval).
59    ///
60    /// Adjusting `block_time` helps balance efficiency and responsiveness:
61    /// - **Low values**: Increase sync frequency but may waste resources on retries.
62    /// - **High values**: Reduce sync frequency but may delay updates on faster chains.
63    #[clap(long, default_value = "600")]
64    block_time: u64,
65
66    /// Maximum wait time in seconds beyond the block time. Useful for handling
67    /// chains with variable block intervals or network delays.
68    #[clap(long, default_value = "1")]
69    timeout: u64,
70
71    /// Logging folder path.
72    #[clap(long, default_value = "logs")]
73    log_folder: String,
74
75    /// Run the example on a single block with UniswapV2 and UniswapV3.
76    #[clap(long)]
77    example: bool,
78
79    /// If set, only component and tokens are streamed, any snapshots or state updates
80    /// are omitted from the stream.
81    #[clap(long)]
82    no_state: bool,
83
84    /// Maximum amount of messages to process before exiting. Useful for debugging e.g.
85    /// to easily get a state sync messages for a fixture. Alternatively this may be
86    /// used to trigger a regular restart or resync.
87    #[clap(short='n', long, default_value=None)]
88    max_messages: Option<usize>,
89
90    /// Maximum blocks an exchange can be absent for before it is marked as stale. Used
91    /// in conjunction with block_time to calculate a timeout: block_time * max_missed_blocks.
92    #[clap(long, default_value = "10")]
93    max_missed_blocks: u64,
94}
95
96impl CliArgs {
97    fn validate(&self) -> Result<(), String> {
98        if self.remove_tvl_threshold.is_some() && self.add_tvl_threshold.is_none() {
99            return Err("Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string());
100        }
101        if self.remove_tvl_threshold.is_none() && self.add_tvl_threshold.is_some() {
102            return Err("Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string());
103        }
104        Ok(())
105    }
106}
107
108pub async fn run_cli() {
109    // Parse CLI Args
110    let args: CliArgs = CliArgs::parse();
111    if let Err(e) = args.validate() {
112        panic!("{}", e);
113    }
114
115    // Setup Logging
116    let (non_blocking, _guard) =
117        tracing_appender::non_blocking(rolling::never(&args.log_folder, "dev_logs.log"));
118    let subscriber = tracing_subscriber::fmt()
119        .with_env_filter(
120            tracing_subscriber::EnvFilter::try_from_default_env()
121                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
122        )
123        .with_writer(non_blocking)
124        .finish();
125
126    tracing::subscriber::set_global_default(subscriber)
127        .expect("Failed to set up logging subscriber");
128
129    // Runs example if flag is set.
130    if args.example {
131        // Run a simple example of a block synchronizer.
132        //
133        // You need to port-forward tycho before running this:
134        //
135        // ```bash
136        // kubectl port-forward -n dev-tycho deploy/tycho-indexer 8888:4242
137        // ```
138        let exchanges = vec![
139            (
140                "uniswap_v3".to_string(),
141                Some("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640".to_string()),
142            ),
143            (
144                "uniswap_v2".to_string(),
145                Some("0xa478c2975ab1ea89e8196811f51a7b7ade33eb11".to_string()),
146            ),
147        ];
148        run(exchanges, args).await;
149        return;
150    }
151
152    // Parse exchange name and addresses from name:address format.
153    let exchanges: Vec<(String, Option<String>)> = args
154        .exchange
155        .iter()
156        .filter_map(|e| {
157            if e.contains('-') {
158                let parts: Vec<&str> = e.split('-').collect();
159                if parts.len() == 2 {
160                    Some((parts[0].to_string(), Some(parts[1].to_string())))
161                } else {
162                    tracing::warn!("Ignoring invalid exchange format: {}", e);
163                    None
164                }
165            } else {
166                Some((e.to_string(), None))
167            }
168        })
169        .collect();
170
171    tracing::info!("Running with exchanges: {:?}", exchanges);
172
173    run(exchanges, args).await;
174}
175
176async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) {
177    //TODO: remove "or args.auth_key.is_none()" when our internal client use the no_tls flag
178    let (tycho_ws_url, tycho_rpc_url) = if args.no_tls || args.auth_key.is_none() {
179        info!("Using non-secure connection: ws:// and http://");
180        let tycho_ws_url = format!("ws://{}", &args.tycho_url);
181        let tycho_rpc_url = format!("http://{}", &args.tycho_url);
182        (tycho_ws_url, tycho_rpc_url)
183    } else {
184        info!("Using secure connection: wss:// and https://");
185        let tycho_ws_url = format!("wss://{}", &args.tycho_url);
186        let tycho_rpc_url = format!("https://{}", &args.tycho_url);
187        (tycho_ws_url, tycho_rpc_url)
188    };
189
190    let ws_client = WsDeltasClient::new(&tycho_ws_url, args.auth_key.as_deref()).unwrap();
191    let rpc_client = HttpRPCClient::new(&tycho_rpc_url, args.auth_key.as_deref()).unwrap();
192    let chain =
193        Chain::from_str(&args.chain).unwrap_or_else(|_| panic!("Unknown chain {}", &args.chain));
194    let ws_jh = ws_client
195        .connect()
196        .await
197        .expect("ws client connection error");
198
199    let mut block_sync = BlockSynchronizer::new(
200        Duration::from_secs(args.block_time),
201        Duration::from_secs(args.timeout),
202        args.max_missed_blocks,
203    );
204
205    if let Some(mm) = &args.max_messages {
206        block_sync.max_messages(*mm);
207    }
208
209    let available_protocols_set = rpc_client
210        .get_protocol_systems(&ProtocolSystemsRequestBody {
211            chain,
212            pagination: PaginationParams { page: 0, page_size: 100 },
213        })
214        .await
215        .unwrap()
216        .protocol_systems
217        .into_iter()
218        .collect::<HashSet<_>>();
219
220    let requested_protocol_set = exchanges
221        .iter()
222        .map(|(name, _)| name.clone())
223        .collect::<HashSet<_>>();
224
225    let not_requested_protocols = available_protocols_set
226        .difference(&requested_protocol_set)
227        .cloned()
228        .collect::<Vec<_>>();
229
230    if !not_requested_protocols.is_empty() {
231        tracing::info!("Other available protocols: {}", not_requested_protocols.join(", "));
232    }
233
234    for (name, address) in exchanges {
235        debug!("Registering exchange: {}", name);
236        let id = ExtractorIdentity { chain, name: name.clone() };
237        let filter = if address.is_some() {
238            ComponentFilter::Ids(vec![address.unwrap()])
239        } else if let (Some(remove_tvl), Some(add_tvl)) =
240            (args.remove_tvl_threshold, args.add_tvl_threshold)
241        {
242            ComponentFilter::with_tvl_range(remove_tvl as f64, add_tvl as f64)
243        } else {
244            ComponentFilter::with_tvl_range(args.min_tvl as f64, args.min_tvl as f64)
245        };
246        let sync = ProtocolStateSynchronizer::new(
247            id.clone(),
248            true,
249            filter,
250            3,
251            !args.no_state,
252            rpc_client.clone(),
253            ws_client.clone(),
254            args.block_time + args.timeout,
255        );
256        block_sync = block_sync.register_synchronizer(id, sync);
257    }
258
259    let (sync_jh, mut rx) = block_sync
260        .run()
261        .await
262        .expect("block sync start error");
263
264    let msg_printer = tokio::spawn(async move {
265        while let Some(msg) = rx.recv().await {
266            if let Ok(msg_json) = serde_json::to_string(&msg) {
267                println!("{msg_json}");
268            } else {
269                tracing::error!("Failed to serialize FeedMessage");
270            }
271        }
272    });
273
274    // Monitor the WebSocket, BlockSynchronizer and message printer futures.
275    tokio::select! {
276        res = ws_jh => {
277            let _ = res.expect("WebSocket connection dropped unexpectedly");
278        }
279        res = sync_jh => {
280            res.expect("BlockSynchronizer stopped unexpectedly");
281        }
282        res = msg_printer => {
283            res.expect("Message printer stopped unexpectedly");
284        }
285    }
286
287    tracing::debug!("RX closed");
288}
289
290#[cfg(test)]
291mod cli_tests {
292    use clap::Parser;
293
294    use super::CliArgs;
295
296    #[tokio::test]
297    async fn test_cli_args() {
298        let args = CliArgs::parse_from([
299            "tycho-client",
300            "--tycho-url",
301            "localhost:5000",
302            "--exchange",
303            "uniswap_v2",
304            "--min-tvl",
305            "3000",
306            "--block-time",
307            "50",
308            "--timeout",
309            "5",
310            "--log-folder",
311            "test_logs",
312            "--example",
313            "--max-messages",
314            "1",
315        ]);
316        let exchanges: Vec<String> = vec!["uniswap_v2".to_string()];
317        assert_eq!(args.tycho_url, "localhost:5000");
318        assert_eq!(args.exchange, exchanges);
319        assert_eq!(args.min_tvl, 3000);
320        assert_eq!(args.block_time, 50);
321        assert_eq!(args.timeout, 5);
322        assert_eq!(args.log_folder, "test_logs");
323        assert_eq!(args.max_messages, Some(1));
324        assert!(args.example);
325    }
326}