Skip to main content

tycho_client/
cli.rs

1use std::{collections::HashSet, str::FromStr, time::Duration};
2
3use clap::Parser;
4use tracing::{debug, error, info, warn};
5use tracing_appender::rolling;
6use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
7
8use crate::{
9    deltas::DeltasClient,
10    feed::{
11        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
12        BlockSynchronizer,
13    },
14    rpc::{HttpRPCClientOptions, RPCClient},
15    HttpRPCClient, WsDeltasClient,
16};
17
18/// Tycho Client CLI - A tool for indexing and tracking blockchain protocol data
19///
20/// This CLI tool connects to a Tycho server and tracks various blockchain protocols,
21/// providing real-time updates about their state.
22#[derive(Parser, Debug, Clone, PartialEq)]
23#[clap(version = env!("CARGO_PKG_VERSION"))]
24struct CliArgs {
25    /// Tycho server URL, without protocol. Example: localhost:4242
26    #[clap(long, default_value = "localhost:4242", env = "TYCHO_URL")]
27    tycho_url: String,
28
29    /// Tycho gateway API key, used as authentication for both websocket and http connections.
30    /// Can be set with TYCHO_AUTH_TOKEN env variable.
31    #[clap(short = 'k', long, env = "TYCHO_AUTH_TOKEN")]
32    auth_key: Option<String>,
33
34    /// If set, use unsecured transports: http and ws instead of https and wss.
35    #[clap(long)]
36    no_tls: bool,
37
38    /// The blockchain to index on
39    #[clap(short = 'c', long, default_value = "ethereum")]
40    pub chain: String,
41
42    /// Specifies exchanges. Optionally also supply a pool address in the format
43    /// {exchange}-{pool_address}
44    #[clap(short = 'e', long, number_of_values = 1)]
45    exchange: Vec<String>,
46
47    /// Specifies the minimum TVL to filter the components. Denoted in the native token (e.g.
48    /// Mainnet -> ETH). Ignored if addresses or range tvl values are provided.
49    #[clap(long, default_value = "10")]
50    min_tvl: f64,
51
52    /// Specifies the lower bound of the TVL threshold range. Denoted in the native token (e.g.
53    /// Mainnet -> ETH). Components below this TVL will be removed from tracking.
54    #[clap(long)]
55    remove_tvl_threshold: Option<f64>,
56
57    /// Specifies the upper bound of the TVL threshold range. Denoted in the native token (e.g.
58    /// Mainnet -> ETH). Components above this TVL will be added to tracking.
59    #[clap(long)]
60    add_tvl_threshold: Option<f64>,
61
62    /// Expected block time in seconds. For blockchains with consistent intervals,
63    /// set to the average block time (e.g., "600" for a 10-minute interval).
64    ///
65    /// Adjusting `block_time` helps balance efficiency and responsiveness:
66    /// - **Low values**: Increase sync frequency but may waste resources on retries.
67    /// - **High values**: Reduce sync frequency but may delay updates on faster chains.
68    #[clap(long, default_value = "600")]
69    block_time: u64,
70
71    /// Maximum wait time in seconds beyond the block time. Useful for handling
72    /// chains with variable block intervals or network delays.
73    #[clap(long, default_value = "1")]
74    timeout: u64,
75
76    /// Logging folder path.
77    #[clap(long, default_value = "logs")]
78    log_folder: String,
79
80    /// Run the example on a single block with UniswapV2 and UniswapV3.
81    #[clap(long)]
82    example: bool,
83
84    /// If set, only component and tokens are streamed, any snapshots or state updates
85    /// are omitted from the stream.
86    #[clap(long)]
87    no_state: bool,
88
89    /// Maximum amount of messages to process before exiting. Useful for debugging e.g.
90    /// to easily get a state sync messages for a fixture. Alternatively this may be
91    /// used to trigger a regular restart or resync.
92    #[clap(short='n', long, default_value=None)]
93    max_messages: Option<usize>,
94
95    /// Maximum blocks an exchange can be absent for before it is marked as stale. Used
96    /// in conjunction with block_time to calculate a timeout: block_time * max_missed_blocks.
97    #[clap(long, default_value = "10")]
98    max_missed_blocks: u64,
99
100    /// If set, the synchronizer will include TVL in the messages.
101    /// Enabling this option will increase the number of network requests made during start-up,
102    /// which may result in increased start-up latency.
103    #[clap(long)]
104    include_tvl: bool,
105
106    /// If set, disable compression for WebSocket messages.
107    /// By default, messages are compressed using zstd.
108    #[clap(long)]
109    disable_compression: bool,
110
111    /// If set, enables receiving partial block updates (flashblocks).
112    /// This allows the client to receive incremental updates within a block, allowing for
113    /// lower latency.
114    #[clap(long)]
115    partial_blocks: bool,
116
117    /// Enable verbose logging. This will show more detailed information about the
118    /// synchronization process and any errors that occur.
119    #[clap(long)]
120    verbose: bool,
121}
122
123impl CliArgs {
124    fn validate(&self) -> Result<(), String> {
125        // TVL thresholds must be set together - either both or neither
126        match (self.remove_tvl_threshold, self.add_tvl_threshold) {
127            (Some(remove), Some(add)) if remove >= add => {
128                return Err("remove_tvl_threshold must be less than add_tvl_threshold".to_string());
129            }
130            (Some(_), None) | (None, Some(_)) => {
131                return Err(
132                    "Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string()
133                );
134            }
135            _ => {}
136        }
137
138        Ok(())
139    }
140}
141
142pub async fn run_cli() -> Result<(), String> {
143    // Parse CLI Args
144    let args: CliArgs = CliArgs::parse();
145    args.validate()?;
146
147    // Setup Logging
148    let log_level = if args.verbose { "debug" } else { "info" };
149    let (non_blocking, _guard) =
150        tracing_appender::non_blocking(rolling::never(&args.log_folder, "dev_logs.log"));
151    let subscriber = tracing_subscriber::fmt()
152        .with_env_filter(
153            tracing_subscriber::EnvFilter::try_from_default_env()
154                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level)),
155        )
156        .with_writer(non_blocking)
157        .finish();
158
159    tracing::subscriber::set_global_default(subscriber)
160        .map_err(|e| format!("Failed to set up logging subscriber: {e}"))?;
161
162    // Build the list of exchanges.  When --example is provided, we seed the list with a fixed
163    // pair of well-known pools, otherwise we parse user supplied values (either plain exchange
164    // names or exchange-pool pairs in the {exchange}-{pool_address} format).
165    let exchanges: Vec<(String, Option<String>)> = if args.example {
166        // You will need to port-forward tycho to run the example:
167        //
168        // ```bash
169        // kubectl port-forward -n dev-tycho deploy/tycho-indexer 8888:4242
170        // ```
171        vec![
172            (
173                "uniswap_v3".to_string(),
174                Some("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640".to_string()),
175            ),
176            (
177                "uniswap_v2".to_string(),
178                Some("0xa478c2975ab1ea89e8196811f51a7b7ade33eb11".to_string()),
179            ),
180        ]
181    } else {
182        args.exchange
183            .iter()
184            .filter_map(|e| {
185                if e.contains('-') {
186                    let parts: Vec<&str> = e.split('-').collect();
187                    if parts.len() == 2 {
188                        Some((parts[0].to_string(), Some(parts[1].to_string())))
189                    } else {
190                        warn!("Ignoring invalid exchange format: {}", e);
191                        None
192                    }
193                } else {
194                    Some((e.to_string(), None))
195                }
196            })
197            .collect()
198    };
199
200    info!("Running with exchanges: {:?}", exchanges);
201
202    run(exchanges, args).await?;
203    Ok(())
204}
205
206async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) -> Result<(), String> {
207    info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
208    //TODO: remove "or args.auth_key.is_none()" when our internal client use the no_tls flag
209    let (tycho_ws_url, tycho_rpc_url) = if args.no_tls || args.auth_key.is_none() {
210        info!("Using non-secure connection: ws:// and http://");
211        let tycho_ws_url = format!("ws://{url}", url = &args.tycho_url);
212        let tycho_rpc_url = format!("http://{url}", url = &args.tycho_url);
213        (tycho_ws_url, tycho_rpc_url)
214    } else {
215        info!("Using secure connection: wss:// and https://");
216        let tycho_ws_url = format!("wss://{url}", url = &args.tycho_url);
217        let tycho_rpc_url = format!("https://{url}", url = &args.tycho_url);
218        (tycho_ws_url, tycho_rpc_url)
219    };
220
221    let ws_client = WsDeltasClient::new(&tycho_ws_url, args.auth_key.as_deref())
222        .map_err(|e| format!("Failed to create WebSocket client: {e}"))?;
223    let rpc_client = HttpRPCClient::new(
224        &tycho_rpc_url,
225        HttpRPCClientOptions::new()
226            .with_auth_key(args.auth_key.clone())
227            .with_compression(!args.disable_compression),
228    )
229    .map_err(|e| format!("Failed to create RPC client: {e}"))?;
230    let chain = Chain::from_str(&args.chain)
231        .map_err(|_| format!("Unknown chain: {chain}", chain = &args.chain))?;
232    let ws_jh = ws_client
233        .connect()
234        .await
235        .map_err(|e| format!("WebSocket client connection error: {e}"))?;
236
237    let mut block_sync = BlockSynchronizer::new(
238        Duration::from_secs(args.block_time),
239        Duration::from_secs(args.timeout),
240        args.max_missed_blocks,
241    );
242
243    if let Some(mm) = &args.max_messages {
244        block_sync.max_messages(*mm);
245    }
246
247    let available_protocols_set = rpc_client
248        .get_protocol_systems(&ProtocolSystemsRequestBody {
249            chain,
250            pagination: PaginationParams { page: 0, page_size: 100 },
251        })
252        .await
253        .map_err(|e| format!("Failed to get protocol systems: {e}"))?
254        .protocol_systems
255        .into_iter()
256        .collect::<HashSet<_>>();
257
258    let requested_protocol_set = exchanges
259        .iter()
260        .map(|(name, _)| name.clone())
261        .collect::<HashSet<_>>();
262
263    let not_requested_protocols = available_protocols_set
264        .difference(&requested_protocol_set)
265        .cloned()
266        .collect::<Vec<_>>();
267
268    if !not_requested_protocols.is_empty() {
269        info!("Other available protocols: {}", not_requested_protocols.join(", "));
270    }
271
272    for (name, address) in exchanges {
273        debug!("Registering exchange: {}", name);
274        let id = ExtractorIdentity { chain, name: name.clone() };
275        let filter = if let Some(address) = address {
276            ComponentFilter::Ids(vec![address])
277        } else if let (Some(remove_tvl), Some(add_tvl)) =
278            (args.remove_tvl_threshold, args.add_tvl_threshold)
279        {
280            ComponentFilter::with_tvl_range(remove_tvl, add_tvl)
281        } else {
282            ComponentFilter::with_tvl_range(args.min_tvl, args.min_tvl)
283        };
284        let sync = ProtocolStateSynchronizer::new(
285            id.clone(),
286            true,
287            filter,
288            32,
289            Duration::from_secs(args.block_time / 2),
290            !args.no_state,
291            args.include_tvl,
292            !args.disable_compression,
293            rpc_client.clone(),
294            ws_client.clone(),
295            args.block_time + args.timeout,
296        )
297        .with_partial_blocks(args.partial_blocks);
298        block_sync = block_sync.register_synchronizer(id, sync);
299    }
300
301    let (sync_jh, mut rx) = block_sync
302        .run()
303        .await
304        .map_err(|e| format!("Failed to start block synchronizer: {e}"))?;
305
306    let msg_printer = tokio::spawn(async move {
307        while let Some(result) = rx.recv().await {
308            let msg =
309                result.map_err(|e| format!("Message printer received synchronizer error: {e}"))?;
310
311            if let Ok(msg_json) = serde_json::to_string(&msg) {
312                println!("{msg_json}");
313            } else {
314                // Log the error but continue processing further messages.
315                error!("Failed to serialize FeedMessage");
316            };
317        }
318
319        Ok::<(), String>(())
320    });
321
322    // Monitor the WebSocket, BlockSynchronizer and message printer futures.
323    let (failed_task, shutdown_reason) = tokio::select! {
324        res = ws_jh => (
325            "WebSocket",
326            extract_nested_error(res)
327        ),
328        res = sync_jh => (
329            "BlockSynchronizer",
330            extract_nested_error::<_, _, String>(Ok(res))
331            ),
332        res = msg_printer => (
333            "MessagePrinter",
334            extract_nested_error(res)
335        )
336    };
337
338    debug!("RX closed");
339    Err(format!(
340        "{failed_task} task terminated: {}",
341        shutdown_reason.unwrap_or("unknown reason".to_string())
342    ))
343}
344
345#[inline]
346fn extract_nested_error<T, E1: ToString, E2: ToString>(
347    res: Result<Result<T, E1>, E2>,
348) -> Option<String> {
349    res.map_err(|e| e.to_string())
350        .and_then(|r| r.map_err(|e| e.to_string()))
351        .err()
352}
353
354#[cfg(test)]
355mod cli_tests {
356    use clap::Parser;
357
358    use super::CliArgs;
359
360    #[tokio::test]
361    async fn test_cli_args() {
362        let args = CliArgs::parse_from([
363            "tycho-client",
364            "--tycho-url",
365            "localhost:5000",
366            "--exchange",
367            "uniswap_v2",
368            "--min-tvl",
369            "3000",
370            "--block-time",
371            "50",
372            "--timeout",
373            "5",
374            "--log-folder",
375            "test_logs",
376            "--example",
377            "--max-messages",
378            "1",
379        ]);
380        let exchanges: Vec<String> = vec!["uniswap_v2".to_string()];
381        assert_eq!(args.tycho_url, "localhost:5000");
382        assert_eq!(args.exchange, exchanges);
383        assert_eq!(args.min_tvl, 3000.0);
384        assert_eq!(args.block_time, 50);
385        assert_eq!(args.timeout, 5);
386        assert_eq!(args.log_folder, "test_logs");
387        assert_eq!(args.max_messages, Some(1));
388        assert!(args.example);
389        assert_eq!(args.disable_compression, false);
390        assert_eq!(args.partial_blocks, false);
391    }
392}