Skip to main content

tycho_client/
cli.rs

1use std::{path::Path, str::FromStr};
2
3use clap::Parser;
4use tracing::info;
5use tracing_appender::rolling;
6use tycho_common::{dto::TvlThresholdTier, models::Chain};
7
8use crate::{
9    feed::{component_tracker::ComponentFilter, dto::FeedMessage as FeedMessageDto},
10    stream::TychoStreamBuilder,
11};
12
13/// Tycho Client CLI - A tool for indexing and tracking blockchain protocol data
14///
15/// This CLI tool connects to a Tycho server and tracks various blockchain protocols,
16/// providing real-time updates about their state.
17#[derive(Parser, Debug, Clone, PartialEq)]
18#[clap(version = env!("CARGO_PKG_VERSION"))]
19struct CliArgs {
20    /// Tycho server URL, without protocol. Example: localhost:4242
21    #[clap(long, default_value = "localhost:4242", env = "TYCHO_URL")]
22    tycho_url: String,
23
24    /// Tycho gateway API key, used as authentication for both websocket and http connections.
25    /// Can be set with TYCHO_AUTH_TOKEN env variable.
26    #[clap(short = 'k', long, env = "TYCHO_AUTH_TOKEN")]
27    auth_key: Option<String>,
28
29    /// If set, use unsecured transports: http and ws instead of https and wss.
30    #[clap(long)]
31    no_tls: bool,
32
33    /// The blockchain to index on
34    #[clap(short = 'c', long, default_value = "ethereum")]
35    pub chain: String,
36
37    /// Specifies exchanges. Optionally also supply a pool address in the format
38    /// {exchange}-{pool_address}
39    #[clap(short = 'e', long, number_of_values = 1)]
40    exchange: Vec<String>,
41
42    /// Specifies the minimum TVL to filter the components. Denoted in the native token.
43    /// Defaults to a chain-appropriate value targeting ~$20K USD equivalent.
44    /// Ignored if addresses or range tvl values are provided.
45    #[clap(long)]
46    min_tvl: Option<f64>,
47
48    /// Specifies the lower bound of the TVL threshold range. Denoted in the native token (e.g.
49    /// Mainnet -> ETH). Components below this TVL will be removed from tracking.
50    #[clap(long)]
51    remove_tvl_threshold: Option<f64>,
52
53    /// Specifies the upper bound of the TVL threshold range. Denoted in the native token (e.g.
54    /// Mainnet -> ETH). Components above this TVL will be added to tracking.
55    #[clap(long)]
56    add_tvl_threshold: Option<f64>,
57
58    /// Expected block time in seconds. Defaults to the canonical block time for the selected
59    /// chain (e.g. 12s for Ethereum, 2s for Base).
60    ///
61    /// Adjusting `block_time` helps balance efficiency and responsiveness:
62    /// - **Low values**: Increase sync frequency but may waste resources on retries.
63    /// - **High values**: Reduce sync frequency but may delay updates on faster chains.
64    #[clap(long)]
65    block_time: Option<u64>,
66
67    /// Maximum wait time in seconds beyond the block time. Useful for handling
68    /// chains with variable block intervals or network delays. Defaults to a chain-appropriate
69    /// value.
70    #[clap(long)]
71    timeout: Option<u64>,
72
73    /// Logging folder path.
74    #[clap(long, default_value = "logs")]
75    log_folder: String,
76
77    /// Run the example on a single block with UniswapV2 and UniswapV3.
78    #[clap(long)]
79    example: bool,
80
81    /// If set, only component and tokens are streamed, any snapshots or state updates
82    /// are omitted from the stream.
83    #[clap(long)]
84    no_state: bool,
85
86    /// Maximum amount of messages to process before exiting. Useful for debugging e.g.
87    /// to easily get a state sync messages for a fixture. Alternatively this may be
88    /// used to trigger a regular restart or resync.
89    #[clap(short='n', long, default_value=None)]
90    max_messages: Option<usize>,
91
92    /// Maximum blocks an exchange can be absent for before it is marked as stale. Used
93    /// in conjunction with block_time to calculate a timeout: block_time * max_missed_blocks.
94    /// Defaults to a chain-appropriate value.
95    #[clap(long)]
96    max_missed_blocks: Option<u64>,
97
98    /// If set, the synchronizer will include TVL in the messages.
99    /// Enabling this option will increase the number of network requests made during start-up,
100    /// which may result in increased start-up latency.
101    #[clap(long)]
102    include_tvl: bool,
103
104    /// If set, disable compression for WebSocket messages.
105    /// By default, messages are compressed using zstd.
106    #[clap(long)]
107    disable_compression: bool,
108
109    /// If set, enables receiving partial block updates (flashblocks).
110    /// This allows the client to receive incremental updates within a block, allowing for
111    /// lower latency.
112    #[clap(long)]
113    partial_blocks: bool,
114
115    /// Enable verbose logging. This will show more detailed information about the
116    /// synchronization process and any errors that occur.
117    #[clap(long)]
118    verbose: bool,
119
120    /// Maximum number of retry attempts for failed startups
121    #[clap(long, default_value = "32")]
122    max_retries: u64,
123
124    /// Path to a TOML file containing component IDs to exclude from tracking.
125    #[clap(long)]
126    blocklist_config: Option<std::path::PathBuf>,
127}
128
129impl CliArgs {
130    fn validate(&self) -> Result<(), String> {
131        // TVL thresholds must be set together - either both or neither
132        match (self.remove_tvl_threshold, self.add_tvl_threshold) {
133            (Some(remove), Some(add)) if remove >= add => {
134                return Err("remove_tvl_threshold must be less than add_tvl_threshold".to_string());
135            }
136            (Some(_), None) | (None, Some(_)) => {
137                return Err(
138                    "Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string()
139                );
140            }
141            _ => {}
142        }
143
144        Ok(())
145    }
146}
147
148#[derive(serde::Deserialize)]
149struct BlocklistFile {
150    ids: Vec<String>,
151}
152
153fn load_blocklist(path: &Path) -> Result<Vec<String>, String> {
154    let content = std::fs::read_to_string(path)
155        .map_err(|e| format!("Failed to read blocklist file {}: {e}", path.display()))?;
156    let file: BlocklistFile = toml::from_str(&content)
157        .map_err(|e| format!("Failed to parse blocklist file {}: {e}", path.display()))?;
158    Ok(file.ids)
159}
160
161pub async fn run_cli() -> Result<(), String> {
162    // Parse CLI Args
163    let args: CliArgs = CliArgs::parse();
164    args.validate()?;
165
166    // Setup Logging
167    let log_level = if args.verbose { "debug" } else { "info" };
168    let (non_blocking, _guard) =
169        tracing_appender::non_blocking(rolling::never(&args.log_folder, "dev_logs.log"));
170    let subscriber = tracing_subscriber::fmt()
171        .with_env_filter(
172            tracing_subscriber::EnvFilter::try_from_default_env()
173                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level)),
174        )
175        .with_writer(non_blocking)
176        .finish();
177
178    tracing::subscriber::set_global_default(subscriber)
179        .map_err(|e| format!("Failed to set up logging subscriber: {e}"))?;
180
181    // Build the list of exchanges.  When --example is provided, we seed the list with a fixed
182    // pair of well-known pools, otherwise we parse user supplied values (either plain exchange
183    // names or exchange-pool pairs in the {exchange}-{pool_address} format).
184    let exchanges: Vec<(String, Option<String>)> = if args.example {
185        // You will need to port-forward tycho to run the example:
186        //
187        // ```bash
188        // kubectl port-forward -n dev-tycho deploy/tycho-indexer 8888:4242
189        // ```
190        vec![
191            (
192                "uniswap_v3".to_string(),
193                Some("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640".to_string()),
194            ),
195            (
196                "uniswap_v2".to_string(),
197                Some("0xa478c2975ab1ea89e8196811f51a7b7ade33eb11".to_string()),
198            ),
199        ]
200    } else {
201        args.exchange
202            .iter()
203            .filter_map(|e| {
204                if e.contains('-') {
205                    let parts: Vec<&str> = e.split('-').collect();
206                    if parts.len() == 2 {
207                        Some((parts[0].to_string(), Some(parts[1].to_string())))
208                    } else {
209                        tracing::warn!("Ignoring invalid exchange format: {}", e);
210                        None
211                    }
212                } else {
213                    Some((e.to_string(), None))
214                }
215            })
216            .collect()
217    };
218
219    info!("Running with exchanges: {:?}", exchanges);
220
221    run(exchanges, args).await?;
222    Ok(())
223}
224
225async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) -> Result<(), String> {
226    let blocklist = match &args.blocklist_config {
227        Some(path) => load_blocklist(path)?,
228        None => Vec::new(),
229    };
230
231    let chain = Chain::from_str(&args.chain)
232        .map_err(|_| format!("Unknown chain: {chain}", chain = args.chain))?;
233
234    let mut builder = TychoStreamBuilder::new(&args.tycho_url, chain)
235        .auth_key(args.auth_key)
236        .no_tls(args.no_tls)
237        .no_state(args.no_state)
238        .include_tvl(args.include_tvl)
239        .max_retries(args.max_retries)
240        .blocklisted_ids(blocklist);
241
242    if let Some(bt) = args.block_time {
243        builder = builder.block_time(bt);
244    }
245    if let Some(to) = args.timeout {
246        builder = builder.timeout(to);
247    }
248    if let Some(mmb) = args.max_missed_blocks {
249        builder = builder.max_missed_blocks(mmb);
250    }
251    if args.disable_compression {
252        builder = builder.disable_compression();
253    }
254    if args.partial_blocks {
255        builder = builder.enable_partial_blocks();
256    }
257    if let Some(n) = args.max_messages {
258        builder = builder.max_messages(n);
259    }
260    // Register exchanges
261    let builder = exchanges
262        .into_iter()
263        .fold(builder, |b, (name, address)| {
264            let filter = if let Some(addr) = address {
265                ComponentFilter::Ids(vec![addr])
266            } else if let (Some(remove_tvl), Some(add_tvl)) =
267                (args.remove_tvl_threshold, args.add_tvl_threshold)
268            {
269                ComponentFilter::with_tvl_range(remove_tvl, add_tvl)
270            } else {
271                let default_min_tvl = chain.default_tvl_threshold(TvlThresholdTier::Low);
272                let min_tvl = args.min_tvl.unwrap_or(default_min_tvl);
273                ComponentFilter::with_tvl_range(min_tvl, min_tvl)
274            };
275            b.exchange(&name, filter)
276        });
277
278    let (handle, mut rx) = builder
279        .build()
280        .await
281        .map_err(|e| e.to_string())?;
282
283    let msg_printer = tokio::spawn(async move {
284        while let Some(result) = rx.recv().await {
285            let msg =
286                result.map_err(|e| format!("Message printer received synchronizer error: {e}"))?;
287
288            let json = serde_json::to_string(&FeedMessageDto::from(msg))
289                .map_err(|e| format!("Message printer failed to serialize message: {e}"))?;
290            println!("{json}");
291        }
292
293        Ok::<(), String>(())
294    });
295
296    // Monitor the stream handle and message printer futures.
297    let (failed_task, shutdown_reason) = tokio::select! {
298        res = handle => (
299            "Stream",
300            res.err().map(|e| e.to_string())
301        ),
302        res = msg_printer => (
303            "MessagePrinter",
304            extract_nested_error(res)
305        ),
306    };
307
308    Err(format!(
309        "{failed_task} task terminated: {}",
310        shutdown_reason.unwrap_or("unknown reason".to_string())
311    ))
312}
313
314#[inline]
315fn extract_nested_error<T, E1: ToString, E2: ToString>(
316    res: Result<Result<T, E1>, E2>,
317) -> Option<String> {
318    res.map_err(|e| e.to_string())
319        .and_then(|r| r.map_err(|e| e.to_string()))
320        .err()
321}
322
323#[cfg(test)]
324mod cli_tests {
325    use clap::Parser;
326
327    use super::CliArgs;
328
329    #[tokio::test]
330    async fn test_cli_args() {
331        let args = CliArgs::parse_from([
332            "tycho-client",
333            "--tycho-url",
334            "localhost:5000",
335            "--exchange",
336            "uniswap_v2",
337            "--min-tvl",
338            "3000",
339            "--block-time",
340            "50",
341            "--timeout",
342            "5",
343            "--log-folder",
344            "test_logs",
345            "--example",
346            "--max-messages",
347            "1",
348            "--blocklist-config",
349            "blocklist.toml",
350        ]);
351        let exchanges: Vec<String> = vec!["uniswap_v2".to_string()];
352        assert_eq!(args.tycho_url, "localhost:5000");
353        assert_eq!(args.exchange, exchanges);
354        assert_eq!(args.min_tvl, Some(3000.0));
355        assert_eq!(args.block_time, Some(50));
356        assert_eq!(args.timeout, Some(5));
357        assert_eq!(args.log_folder, "test_logs");
358        assert_eq!(args.max_messages, Some(1));
359        assert!(args.example);
360        assert_eq!(args.disable_compression, false);
361        assert_eq!(args.partial_blocks, false);
362        assert_eq!(args.blocklist_config, Some(std::path::PathBuf::from("blocklist.toml")));
363    }
364}