tycho_client/
cli.rs

1use std::{collections::HashSet, str::FromStr, time::Duration};
2
3use clap::Parser;
4use tracing::{debug, error, info, warn};
5use tracing_appender::rolling;
6use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
7
8use crate::{
9    deltas::DeltasClient,
10    feed::{
11        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
12        BlockSynchronizer,
13    },
14    rpc::RPCClient,
15    HttpRPCClient, WsDeltasClient,
16};
17
18#[derive(Parser, Debug, Clone, PartialEq)]
19#[clap(version = env!("CARGO_PKG_VERSION"))]
20struct CliArgs {
21    /// Tycho server URL, without protocol. Example: localhost:4242
22    #[clap(long, default_value = "localhost:4242", env = "TYCHO_URL")]
23    tycho_url: String,
24
25    /// Tycho gateway API key, used as authentication for both websocket and http connections.
26    /// Can be set with TYCHO_AUTH_TOKEN env variable.
27    #[clap(short = 'k', long, env = "TYCHO_AUTH_TOKEN")]
28    auth_key: Option<String>,
29
30    /// If set, use unsecured transports: http and ws instead of https and wss.
31    #[clap(long)]
32    no_tls: bool,
33
34    /// The blockchain to index on
35    #[clap(long, default_value = "ethereum")]
36    pub chain: String,
37
38    /// Specifies exchanges and optionally a pool address in the format name:address
39    #[clap(long, number_of_values = 1)]
40    exchange: Vec<String>,
41
42    /// Specifies the minimum TVL to filter the components. Denoted in the native token (e.g.
43    /// Mainnet -> ETH). Ignored if addresses or range tvl values are provided.
44    #[clap(long, default_value = "10")]
45    min_tvl: u32,
46
47    /// Specifies the lower bound of the TVL threshold range. Denoted in the native token (e.g.
48    /// Mainnet -> ETH). Components below this TVL will be removed from tracking.
49    #[clap(long)]
50    remove_tvl_threshold: Option<u32>,
51
52    /// Specifies the upper bound of the TVL threshold range. Denoted in the native token (e.g.
53    /// Mainnet -> ETH). Components above this TVL will be added to tracking.
54    #[clap(long)]
55    add_tvl_threshold: Option<u32>,
56
57    /// Expected block time in seconds. For blockchains with consistent intervals,
58    /// set to the average block time (e.g., "600" for a 10-minute interval).
59    ///
60    /// Adjusting `block_time` helps balance efficiency and responsiveness:
61    /// - **Low values**: Increase sync frequency but may waste resources on retries.
62    /// - **High values**: Reduce sync frequency but may delay updates on faster chains.
63    #[clap(long, default_value = "600")]
64    block_time: u64,
65
66    /// Maximum wait time in seconds beyond the block time. Useful for handling
67    /// chains with variable block intervals or network delays.
68    #[clap(long, default_value = "1")]
69    timeout: u64,
70
71    /// Logging folder path.
72    #[clap(long, default_value = "logs")]
73    log_folder: String,
74
75    /// Run the example on a single block with UniswapV2 and UniswapV3.
76    #[clap(long)]
77    example: bool,
78
79    /// If set, only component and tokens are streamed, any snapshots or state updates
80    /// are omitted from the stream.
81    #[clap(long)]
82    no_state: bool,
83
84    /// Maximum amount of messages to process before exiting. Useful for debugging e.g.
85    /// to easily get a state sync messages for a fixture. Alternatively this may be
86    /// used to trigger a regular restart or resync.
87    #[clap(short='n', long, default_value=None)]
88    max_messages: Option<usize>,
89
90    /// Maximum blocks an exchange can be absent for before it is marked as stale. Used
91    /// in conjunction with block_time to calculate a timeout: block_time * max_missed_blocks.
92    #[clap(long, default_value = "10")]
93    max_missed_blocks: u64,
94
95    /// If set, the synchronizer will include TVL in the messages.
96    /// Enabling this option will increase the number of network requests made during start-up,
97    //  which may result in increased start-up latency.
98    #[clap(long)]
99    include_tvl: bool,
100}
101
102impl CliArgs {
103    fn validate(&self) -> Result<(), String> {
104        // TVL thresholds must be set together - either both or neither
105        if self.remove_tvl_threshold.is_some() != self.add_tvl_threshold.is_some() {
106            return Err("Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string());
107        }
108        Ok(())
109    }
110}
111
112pub async fn run_cli() -> Result<(), String> {
113    // Parse CLI Args
114    let args: CliArgs = CliArgs::parse();
115    args.validate()?;
116
117    // Setup Logging
118    let (non_blocking, _guard) =
119        tracing_appender::non_blocking(rolling::never(&args.log_folder, "dev_logs.log"));
120    let subscriber = tracing_subscriber::fmt()
121        .with_env_filter(
122            tracing_subscriber::EnvFilter::try_from_default_env()
123                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
124        )
125        .with_writer(non_blocking)
126        .finish();
127
128    tracing::subscriber::set_global_default(subscriber)
129        .map_err(|e| format!("Failed to set up logging subscriber: {e}"))?;
130
131    // Runs example if flag is set.
132    if args.example {
133        // Run a simple example of a block synchronizer.
134        //
135        // You need to port-forward tycho before running this:
136        //
137        // ```bash
138        // kubectl port-forward -n dev-tycho deploy/tycho-indexer 8888:4242
139        // ```
140        let exchanges = vec![
141            (
142                "uniswap_v3".to_string(),
143                Some("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640".to_string()),
144            ),
145            (
146                "uniswap_v2".to_string(),
147                Some("0xa478c2975ab1ea89e8196811f51a7b7ade33eb11".to_string()),
148            ),
149        ];
150        run(exchanges, args).await?;
151        return Ok(());
152    }
153
154    // Parse exchange name and addresses from name:address format.
155    let exchanges: Vec<(String, Option<String>)> = args
156        .exchange
157        .iter()
158        .filter_map(|e| {
159            if e.contains('-') {
160                let parts: Vec<&str> = e.split('-').collect();
161                if parts.len() == 2 {
162                    Some((parts[0].to_string(), Some(parts[1].to_string())))
163                } else {
164                    warn!("Ignoring invalid exchange format: {}", e);
165                    None
166                }
167            } else {
168                Some((e.to_string(), None))
169            }
170        })
171        .collect();
172
173    info!("Running with exchanges: {:?}", exchanges);
174
175    run(exchanges, args).await?;
176    Ok(())
177}
178
179async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) -> Result<(), String> {
180    //TODO: remove "or args.auth_key.is_none()" when our internal client use the no_tls flag
181    let (tycho_ws_url, tycho_rpc_url) = if args.no_tls || args.auth_key.is_none() {
182        info!("Using non-secure connection: ws:// and http://");
183        let tycho_ws_url = format!("ws://{url}", url = &args.tycho_url);
184        let tycho_rpc_url = format!("http://{url}", url = &args.tycho_url);
185        (tycho_ws_url, tycho_rpc_url)
186    } else {
187        info!("Using secure connection: wss:// and https://");
188        let tycho_ws_url = format!("wss://{url}", url = &args.tycho_url);
189        let tycho_rpc_url = format!("https://{url}", url = &args.tycho_url);
190        (tycho_ws_url, tycho_rpc_url)
191    };
192
193    let ws_client = WsDeltasClient::new(&tycho_ws_url, args.auth_key.as_deref())
194        .map_err(|e| format!("Failed to create WebSocket client: {e}"))?;
195    let rpc_client = HttpRPCClient::new(&tycho_rpc_url, args.auth_key.as_deref())
196        .map_err(|e| format!("Failed to create RPC client: {e}"))?;
197    let chain = Chain::from_str(&args.chain)
198        .map_err(|_| format!("Unknown chain: {chain}", chain = &args.chain))?;
199    let ws_jh = ws_client
200        .connect()
201        .await
202        .map_err(|e| format!("WebSocket client connection error: {e}"))?;
203
204    let mut block_sync = BlockSynchronizer::new(
205        Duration::from_secs(args.block_time),
206        Duration::from_secs(args.timeout),
207        args.max_missed_blocks,
208    );
209
210    if let Some(mm) = &args.max_messages {
211        block_sync.max_messages(*mm);
212    }
213
214    let available_protocols_set = rpc_client
215        .get_protocol_systems(&ProtocolSystemsRequestBody {
216            chain,
217            pagination: PaginationParams { page: 0, page_size: 100 },
218        })
219        .await
220        .map_err(|e| format!("Failed to get protocol systems: {e}"))?
221        .protocol_systems
222        .into_iter()
223        .collect::<HashSet<_>>();
224
225    let requested_protocol_set = exchanges
226        .iter()
227        .map(|(name, _)| name.clone())
228        .collect::<HashSet<_>>();
229
230    let not_requested_protocols = available_protocols_set
231        .difference(&requested_protocol_set)
232        .cloned()
233        .collect::<Vec<_>>();
234
235    if !not_requested_protocols.is_empty() {
236        info!("Other available protocols: {}", not_requested_protocols.join(", "));
237    }
238
239    for (name, address) in exchanges {
240        debug!("Registering exchange: {}", name);
241        let id = ExtractorIdentity { chain, name: name.clone() };
242        let filter = if address.is_some() {
243            ComponentFilter::Ids(vec![address.unwrap()])
244        } else if let (Some(remove_tvl), Some(add_tvl)) =
245            (args.remove_tvl_threshold, args.add_tvl_threshold)
246        {
247            ComponentFilter::with_tvl_range(remove_tvl as f64, add_tvl as f64)
248        } else {
249            ComponentFilter::with_tvl_range(args.min_tvl as f64, args.min_tvl as f64)
250        };
251        let sync = ProtocolStateSynchronizer::new(
252            id.clone(),
253            true,
254            filter,
255            3,
256            !args.no_state,
257            args.include_tvl,
258            rpc_client.clone(),
259            ws_client.clone(),
260            args.block_time + args.timeout,
261        );
262        block_sync = block_sync.register_synchronizer(id, sync);
263    }
264
265    let (sync_jh, mut rx) = block_sync
266        .run()
267        .await
268        .map_err(|e| format!("Failed to start block synchronizer: {e}"))?;
269
270    let msg_printer = tokio::spawn(async move {
271        while let Some(msg) = rx.recv().await {
272            if let Ok(msg_json) = serde_json::to_string(&msg) {
273                println!("{msg_json}");
274            } else {
275                error!("Failed to serialize FeedMessage");
276            }
277        }
278    });
279
280    // Monitor the WebSocket, BlockSynchronizer and message printer futures.
281    tokio::select! {
282        res = ws_jh => {
283            if let Err(e) = res {
284                error!("WebSocket connection dropped unexpectedly: {}", e);
285            }
286        }
287        res = sync_jh => {
288            if let Err(e) = res {
289                error!("BlockSynchronizer stopped unexpectedly: {}", e);
290            }
291        }
292        res = msg_printer => {
293            if let Err(e) = res {
294                error!("Message printer stopped unexpectedly: {}", e);
295            }
296        }
297    }
298
299    debug!("RX closed");
300    Ok(())
301}
302
303#[cfg(test)]
304mod cli_tests {
305    use clap::Parser;
306
307    use super::CliArgs;
308
309    #[tokio::test]
310    async fn test_cli_args() {
311        let args = CliArgs::parse_from([
312            "tycho-client",
313            "--tycho-url",
314            "localhost:5000",
315            "--exchange",
316            "uniswap_v2",
317            "--min-tvl",
318            "3000",
319            "--block-time",
320            "50",
321            "--timeout",
322            "5",
323            "--log-folder",
324            "test_logs",
325            "--example",
326            "--max-messages",
327            "1",
328        ]);
329        let exchanges: Vec<String> = vec!["uniswap_v2".to_string()];
330        assert_eq!(args.tycho_url, "localhost:5000");
331        assert_eq!(args.exchange, exchanges);
332        assert_eq!(args.min_tvl, 3000);
333        assert_eq!(args.block_time, 50);
334        assert_eq!(args.timeout, 5);
335        assert_eq!(args.log_folder, "test_logs");
336        assert_eq!(args.max_messages, Some(1));
337        assert!(args.example);
338    }
339}