Skip to main content

tycho_client/
cli.rs

1use std::{collections::HashSet, path::Path, str::FromStr, time::Duration};
2
3use clap::Parser;
4use tracing::{debug, error, info, warn};
5use tracing_appender::rolling;
6use tycho_common::dto::{Chain, ExtractorIdentity};
7
8use crate::{
9    deltas::DeltasClient,
10    feed::{
11        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
12        BlockSynchronizer,
13    },
14    rpc::HttpRPCClientOptions,
15    stream::ProtocolSystemsInfo,
16    HttpRPCClient, WsDeltasClient,
17};
18
19/// Tycho Client CLI - A tool for indexing and tracking blockchain protocol data
20///
21/// This CLI tool connects to a Tycho server and tracks various blockchain protocols,
22/// providing real-time updates about their state.
23#[derive(Parser, Debug, Clone, PartialEq)]
24#[clap(version = env!("CARGO_PKG_VERSION"))]
25struct CliArgs {
26    /// Tycho server URL, without protocol. Example: localhost:4242
27    #[clap(long, default_value = "localhost:4242", env = "TYCHO_URL")]
28    tycho_url: String,
29
30    /// Tycho gateway API key, used as authentication for both websocket and http connections.
31    /// Can be set with TYCHO_AUTH_TOKEN env variable.
32    #[clap(short = 'k', long, env = "TYCHO_AUTH_TOKEN")]
33    auth_key: Option<String>,
34
35    /// If set, use unsecured transports: http and ws instead of https and wss.
36    #[clap(long)]
37    no_tls: bool,
38
39    /// The blockchain to index on
40    #[clap(short = 'c', long, default_value = "ethereum")]
41    pub chain: String,
42
43    /// Specifies exchanges. Optionally also supply a pool address in the format
44    /// {exchange}-{pool_address}
45    #[clap(short = 'e', long, number_of_values = 1)]
46    exchange: Vec<String>,
47
48    /// Specifies the minimum TVL to filter the components. Denoted in the native token (e.g.
49    /// Mainnet -> ETH). Ignored if addresses or range tvl values are provided.
50    #[clap(long, default_value = "10")]
51    min_tvl: f64,
52
53    /// Specifies the lower bound of the TVL threshold range. Denoted in the native token (e.g.
54    /// Mainnet -> ETH). Components below this TVL will be removed from tracking.
55    #[clap(long)]
56    remove_tvl_threshold: Option<f64>,
57
58    /// Specifies the upper bound of the TVL threshold range. Denoted in the native token (e.g.
59    /// Mainnet -> ETH). Components above this TVL will be added to tracking.
60    #[clap(long)]
61    add_tvl_threshold: Option<f64>,
62
63    /// Expected block time in seconds. For blockchains with consistent intervals,
64    /// set to the average block time (e.g., "600" for a 10-minute interval).
65    ///
66    /// Adjusting `block_time` helps balance efficiency and responsiveness:
67    /// - **Low values**: Increase sync frequency but may waste resources on retries.
68    /// - **High values**: Reduce sync frequency but may delay updates on faster chains.
69    #[clap(long, default_value = "600")]
70    block_time: u64,
71
72    /// Maximum wait time in seconds beyond the block time. Useful for handling
73    /// chains with variable block intervals or network delays.
74    #[clap(long, default_value = "1")]
75    timeout: u64,
76
77    /// Logging folder path.
78    #[clap(long, default_value = "logs")]
79    log_folder: String,
80
81    /// Run the example on a single block with UniswapV2 and UniswapV3.
82    #[clap(long)]
83    example: bool,
84
85    /// If set, only component and tokens are streamed, any snapshots or state updates
86    /// are omitted from the stream.
87    #[clap(long)]
88    no_state: bool,
89
90    /// Maximum amount of messages to process before exiting. Useful for debugging e.g.
91    /// to easily get a state sync messages for a fixture. Alternatively this may be
92    /// used to trigger a regular restart or resync.
93    #[clap(short='n', long, default_value=None)]
94    max_messages: Option<usize>,
95
96    /// Maximum blocks an exchange can be absent for before it is marked as stale. Used
97    /// in conjunction with block_time to calculate a timeout: block_time * max_missed_blocks.
98    #[clap(long, default_value = "10")]
99    max_missed_blocks: u64,
100
101    /// If set, the synchronizer will include TVL in the messages.
102    /// Enabling this option will increase the number of network requests made during start-up,
103    /// which may result in increased start-up latency.
104    #[clap(long)]
105    include_tvl: bool,
106
107    /// If set, disable compression for WebSocket messages.
108    /// By default, messages are compressed using zstd.
109    #[clap(long)]
110    disable_compression: bool,
111
112    /// If set, enables receiving partial block updates (flashblocks).
113    /// This allows the client to receive incremental updates within a block, allowing for
114    /// lower latency.
115    #[clap(long)]
116    partial_blocks: bool,
117
118    /// Enable verbose logging. This will show more detailed information about the
119    /// synchronization process and any errors that occur.
120    #[clap(long)]
121    verbose: bool,
122
123    /// Maximum number of retry attempts for failed startups
124    #[clap(long, default_value = "32")]
125    max_retries: u64,
126
127    /// Path to a TOML file containing component IDs to exclude from tracking.
128    #[clap(long)]
129    blocklist_config: Option<std::path::PathBuf>,
130}
131
132impl CliArgs {
133    fn validate(&self) -> Result<(), String> {
134        // TVL thresholds must be set together - either both or neither
135        match (self.remove_tvl_threshold, self.add_tvl_threshold) {
136            (Some(remove), Some(add)) if remove >= add => {
137                return Err("remove_tvl_threshold must be less than add_tvl_threshold".to_string());
138            }
139            (Some(_), None) | (None, Some(_)) => {
140                return Err(
141                    "Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string()
142                );
143            }
144            _ => {}
145        }
146
147        Ok(())
148    }
149}
150
151#[derive(serde::Deserialize)]
152struct BlocklistFile {
153    ids: Vec<String>,
154}
155
156fn load_blocklist(path: &Path) -> Result<Vec<String>, String> {
157    let content = std::fs::read_to_string(path)
158        .map_err(|e| format!("Failed to read blocklist file {}: {e}", path.display()))?;
159    let file: BlocklistFile = toml::from_str(&content)
160        .map_err(|e| format!("Failed to parse blocklist file {}: {e}", path.display()))?;
161    Ok(file.ids)
162}
163
164pub async fn run_cli() -> Result<(), String> {
165    // Parse CLI Args
166    let args: CliArgs = CliArgs::parse();
167    args.validate()?;
168
169    // Setup Logging
170    let log_level = if args.verbose { "debug" } else { "info" };
171    let (non_blocking, _guard) =
172        tracing_appender::non_blocking(rolling::never(&args.log_folder, "dev_logs.log"));
173    let subscriber = tracing_subscriber::fmt()
174        .with_env_filter(
175            tracing_subscriber::EnvFilter::try_from_default_env()
176                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level)),
177        )
178        .with_writer(non_blocking)
179        .finish();
180
181    tracing::subscriber::set_global_default(subscriber)
182        .map_err(|e| format!("Failed to set up logging subscriber: {e}"))?;
183
184    // Build the list of exchanges.  When --example is provided, we seed the list with a fixed
185    // pair of well-known pools, otherwise we parse user supplied values (either plain exchange
186    // names or exchange-pool pairs in the {exchange}-{pool_address} format).
187    let exchanges: Vec<(String, Option<String>)> = if args.example {
188        // You will need to port-forward tycho to run the example:
189        //
190        // ```bash
191        // kubectl port-forward -n dev-tycho deploy/tycho-indexer 8888:4242
192        // ```
193        vec![
194            (
195                "uniswap_v3".to_string(),
196                Some("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640".to_string()),
197            ),
198            (
199                "uniswap_v2".to_string(),
200                Some("0xa478c2975ab1ea89e8196811f51a7b7ade33eb11".to_string()),
201            ),
202        ]
203    } else {
204        args.exchange
205            .iter()
206            .filter_map(|e| {
207                if e.contains('-') {
208                    let parts: Vec<&str> = e.split('-').collect();
209                    if parts.len() == 2 {
210                        Some((parts[0].to_string(), Some(parts[1].to_string())))
211                    } else {
212                        warn!("Ignoring invalid exchange format: {}", e);
213                        None
214                    }
215                } else {
216                    Some((e.to_string(), None))
217                }
218            })
219            .collect()
220    };
221
222    info!("Running with exchanges: {:?}", exchanges);
223
224    run(exchanges, args).await?;
225    Ok(())
226}
227
228async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) -> Result<(), String> {
229    let blocklist = match &args.blocklist_config {
230        Some(path) => load_blocklist(path)?,
231        None => Vec::new(),
232    };
233
234    info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
235    //TODO: remove "or args.auth_key.is_none()" when our internal client use the no_tls flag
236    let (tycho_ws_url, tycho_rpc_url) = if args.no_tls || args.auth_key.is_none() {
237        info!("Using non-secure connection: ws:// and http://");
238        let tycho_ws_url = format!("ws://{url}", url = &args.tycho_url);
239        let tycho_rpc_url = format!("http://{url}", url = &args.tycho_url);
240        (tycho_ws_url, tycho_rpc_url)
241    } else {
242        info!("Using secure connection: wss:// and https://");
243        let tycho_ws_url = format!("wss://{url}", url = &args.tycho_url);
244        let tycho_rpc_url = format!("https://{url}", url = &args.tycho_url);
245        (tycho_ws_url, tycho_rpc_url)
246    };
247
248    let ws_client = WsDeltasClient::new(&tycho_ws_url, args.auth_key.as_deref())
249        .map_err(|e| format!("Failed to create WebSocket client: {e}"))?;
250    let rpc_client = HttpRPCClient::new(
251        &tycho_rpc_url,
252        HttpRPCClientOptions::new()
253            .with_auth_key(args.auth_key.clone())
254            .with_compression(!args.disable_compression),
255    )
256    .map_err(|e| format!("Failed to create RPC client: {e}"))?;
257    let chain = Chain::from_str(&args.chain)
258        .map_err(|_| format!("Unknown chain: {chain}", chain = &args.chain))?;
259    let ws_jh = ws_client
260        .connect()
261        .await
262        .map_err(|e| format!("WebSocket client connection error: {e}"))?;
263
264    let mut block_sync = BlockSynchronizer::new(
265        Duration::from_secs(args.block_time),
266        Duration::from_secs(args.timeout),
267        args.max_missed_blocks,
268    );
269
270    if let Some(mm) = &args.max_messages {
271        block_sync.max_messages(*mm);
272    }
273
274    let requested_protocol_set: HashSet<_> = exchanges
275        .iter()
276        .map(|(name, _)| name.clone())
277        .collect();
278    let protocol_info =
279        ProtocolSystemsInfo::fetch(&rpc_client, chain, &requested_protocol_set).await;
280    protocol_info.log_other_available();
281    let dci_protocols = protocol_info.dci_protocols;
282
283    for (name, address) in exchanges {
284        debug!("Registering exchange: {}", name);
285        let id = ExtractorIdentity { chain, name: name.clone() };
286        let filter = if let Some(address) = address {
287            ComponentFilter::Ids(vec![address])
288        } else if let (Some(remove_tvl), Some(add_tvl)) =
289            (args.remove_tvl_threshold, args.add_tvl_threshold)
290        {
291            ComponentFilter::with_tvl_range(remove_tvl, add_tvl)
292        } else {
293            ComponentFilter::with_tvl_range(args.min_tvl, args.min_tvl)
294        }
295        .blocklist(blocklist.clone());
296        let uses_dci = dci_protocols.contains(&name);
297        let sync = ProtocolStateSynchronizer::new(
298            id.clone(),
299            true,
300            filter,
301            args.max_retries,
302            Duration::from_secs(args.block_time / 2),
303            !args.no_state,
304            args.include_tvl,
305            !args.disable_compression,
306            rpc_client.clone(),
307            ws_client.clone(),
308            args.block_time + args.timeout,
309        )
310        .with_dci(uses_dci)
311        .with_partial_blocks(args.partial_blocks);
312        block_sync = block_sync.register_synchronizer(id, sync);
313    }
314
315    let (sync_jh, mut rx) = block_sync
316        .run()
317        .await
318        .map_err(|e| format!("Failed to start block synchronizer: {e}"))?;
319
320    let msg_printer = tokio::spawn(async move {
321        while let Some(result) = rx.recv().await {
322            let msg =
323                result.map_err(|e| format!("Message printer received synchronizer error: {e}"))?;
324
325            if let Ok(msg_json) = serde_json::to_string(&msg) {
326                println!("{msg_json}");
327            } else {
328                // Log the error but continue processing further messages.
329                error!("Failed to serialize FeedMessage");
330            };
331        }
332
333        Ok::<(), String>(())
334    });
335
336    // Monitor the WebSocket, BlockSynchronizer and message printer futures.
337    let (failed_task, shutdown_reason) = tokio::select! {
338        res = ws_jh => (
339            "WebSocket",
340            extract_nested_error(res)
341        ),
342        res = sync_jh => (
343            "BlockSynchronizer",
344            extract_nested_error::<_, _, String>(Ok(res))
345            ),
346        res = msg_printer => (
347            "MessagePrinter",
348            extract_nested_error(res)
349        )
350    };
351
352    debug!("RX closed");
353    Err(format!(
354        "{failed_task} task terminated: {}",
355        shutdown_reason.unwrap_or("unknown reason".to_string())
356    ))
357}
358
359#[inline]
360fn extract_nested_error<T, E1: ToString, E2: ToString>(
361    res: Result<Result<T, E1>, E2>,
362) -> Option<String> {
363    res.map_err(|e| e.to_string())
364        .and_then(|r| r.map_err(|e| e.to_string()))
365        .err()
366}
367
368#[cfg(test)]
369mod cli_tests {
370    use clap::Parser;
371
372    use super::CliArgs;
373
374    #[tokio::test]
375    async fn test_cli_args() {
376        let args = CliArgs::parse_from([
377            "tycho-client",
378            "--tycho-url",
379            "localhost:5000",
380            "--exchange",
381            "uniswap_v2",
382            "--min-tvl",
383            "3000",
384            "--block-time",
385            "50",
386            "--timeout",
387            "5",
388            "--log-folder",
389            "test_logs",
390            "--example",
391            "--max-messages",
392            "1",
393            "--blocklist-config",
394            "blocklist.toml",
395        ]);
396        let exchanges: Vec<String> = vec!["uniswap_v2".to_string()];
397        assert_eq!(args.tycho_url, "localhost:5000");
398        assert_eq!(args.exchange, exchanges);
399        assert_eq!(args.min_tvl, 3000.0);
400        assert_eq!(args.block_time, 50);
401        assert_eq!(args.timeout, 5);
402        assert_eq!(args.log_folder, "test_logs");
403        assert_eq!(args.max_messages, Some(1));
404        assert!(args.example);
405        assert_eq!(args.disable_compression, false);
406        assert_eq!(args.partial_blocks, false);
407        assert_eq!(args.blocklist_config, Some(std::path::PathBuf::from("blocklist.toml")));
408    }
409}