tycho-client 0.157.4

A library and CLI tool for querying and accessing liquidity data from Tycho indexer.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
use std::{collections::HashSet, path::Path, str::FromStr, time::Duration};

use clap::Parser;
use tracing::{debug, error, info, warn};
use tracing_appender::rolling;
use tycho_common::dto::{Chain, ExtractorIdentity};

use crate::{
    deltas::DeltasClient,
    feed::{
        component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
        BlockSynchronizer,
    },
    rpc::HttpRPCClientOptions,
    stream::ProtocolSystemsInfo,
    HttpRPCClient, WsDeltasClient,
};

/// Tycho Client CLI - A tool for indexing and tracking blockchain protocol data
///
/// This CLI tool connects to a Tycho server and tracks various blockchain protocols,
/// providing real-time updates about their state.
#[derive(Parser, Debug, Clone, PartialEq)]
#[clap(version = env!("CARGO_PKG_VERSION"))]
struct CliArgs {
    /// Tycho server URL, without protocol. Example: localhost:4242
    #[clap(long, default_value = "localhost:4242", env = "TYCHO_URL")]
    tycho_url: String,

    /// Tycho gateway API key, used as authentication for both websocket and http connections.
    /// Can be set with TYCHO_AUTH_TOKEN env variable.
    #[clap(short = 'k', long, env = "TYCHO_AUTH_TOKEN")]
    auth_key: Option<String>,

    /// If set, use unsecured transports: http and ws instead of https and wss.
    #[clap(long)]
    no_tls: bool,

    /// The blockchain to index on
    #[clap(short = 'c', long, default_value = "ethereum")]
    pub chain: String,

    /// Specifies exchanges. Optionally also supply a pool address in the format
    /// {exchange}-{pool_address}
    #[clap(short = 'e', long, number_of_values = 1)]
    exchange: Vec<String>,

    /// Specifies the minimum TVL to filter the components. Denoted in the native token (e.g.
    /// Mainnet -> ETH). Ignored if addresses or range tvl values are provided.
    #[clap(long, default_value = "10")]
    min_tvl: f64,

    /// Specifies the lower bound of the TVL threshold range. Denoted in the native token (e.g.
    /// Mainnet -> ETH). Components below this TVL will be removed from tracking.
    #[clap(long)]
    remove_tvl_threshold: Option<f64>,

    /// Specifies the upper bound of the TVL threshold range. Denoted in the native token (e.g.
    /// Mainnet -> ETH). Components above this TVL will be added to tracking.
    #[clap(long)]
    add_tvl_threshold: Option<f64>,

    /// Expected block time in seconds. For blockchains with consistent intervals,
    /// set to the average block time (e.g., "600" for a 10-minute interval).
    ///
    /// Adjusting `block_time` helps balance efficiency and responsiveness:
    /// - **Low values**: Increase sync frequency but may waste resources on retries.
    /// - **High values**: Reduce sync frequency but may delay updates on faster chains.
    #[clap(long, default_value = "600")]
    block_time: u64,

    /// Maximum wait time in seconds beyond the block time. Useful for handling
    /// chains with variable block intervals or network delays.
    #[clap(long, default_value = "1")]
    timeout: u64,

    /// Logging folder path.
    #[clap(long, default_value = "logs")]
    log_folder: String,

    /// Run the example on a single block with UniswapV2 and UniswapV3.
    #[clap(long)]
    example: bool,

    /// If set, only component and tokens are streamed, any snapshots or state updates
    /// are omitted from the stream.
    #[clap(long)]
    no_state: bool,

    /// Maximum amount of messages to process before exiting. Useful for debugging e.g.
    /// to easily get a state sync messages for a fixture. Alternatively this may be
    /// used to trigger a regular restart or resync.
    #[clap(short='n', long, default_value=None)]
    max_messages: Option<usize>,

    /// Maximum blocks an exchange can be absent for before it is marked as stale. Used
    /// in conjunction with block_time to calculate a timeout: block_time * max_missed_blocks.
    #[clap(long, default_value = "10")]
    max_missed_blocks: u64,

    /// If set, the synchronizer will include TVL in the messages.
    /// Enabling this option will increase the number of network requests made during start-up,
    /// which may result in increased start-up latency.
    #[clap(long)]
    include_tvl: bool,

    /// If set, disable compression for WebSocket messages.
    /// By default, messages are compressed using zstd.
    #[clap(long)]
    disable_compression: bool,

    /// If set, enables receiving partial block updates (flashblocks).
    /// This allows the client to receive incremental updates within a block, allowing for
    /// lower latency.
    #[clap(long)]
    partial_blocks: bool,

    /// Enable verbose logging. This will show more detailed information about the
    /// synchronization process and any errors that occur.
    #[clap(long)]
    verbose: bool,

    /// Maximum number of retry attempts for failed startups
    #[clap(long, default_value = "32")]
    max_retries: u64,

    /// Path to a TOML file containing component IDs to exclude from tracking.
    #[clap(long)]
    blocklist_config: Option<std::path::PathBuf>,
}

impl CliArgs {
    fn validate(&self) -> Result<(), String> {
        // TVL thresholds must be set together - either both or neither
        match (self.remove_tvl_threshold, self.add_tvl_threshold) {
            (Some(remove), Some(add)) if remove >= add => {
                return Err("remove_tvl_threshold must be less than add_tvl_threshold".to_string());
            }
            (Some(_), None) | (None, Some(_)) => {
                return Err(
                    "Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string()
                );
            }
            _ => {}
        }

        Ok(())
    }
}

#[derive(serde::Deserialize)]
struct BlocklistFile {
    ids: Vec<String>,
}

fn load_blocklist(path: &Path) -> Result<Vec<String>, String> {
    let content = std::fs::read_to_string(path)
        .map_err(|e| format!("Failed to read blocklist file {}: {e}", path.display()))?;
    let file: BlocklistFile = toml::from_str(&content)
        .map_err(|e| format!("Failed to parse blocklist file {}: {e}", path.display()))?;
    Ok(file.ids)
}

pub async fn run_cli() -> Result<(), String> {
    // Parse CLI Args
    let args: CliArgs = CliArgs::parse();
    args.validate()?;

    // Setup Logging
    let log_level = if args.verbose { "debug" } else { "info" };
    let (non_blocking, _guard) =
        tracing_appender::non_blocking(rolling::never(&args.log_folder, "dev_logs.log"));
    let subscriber = tracing_subscriber::fmt()
        .with_env_filter(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level)),
        )
        .with_writer(non_blocking)
        .finish();

    tracing::subscriber::set_global_default(subscriber)
        .map_err(|e| format!("Failed to set up logging subscriber: {e}"))?;

    // Build the list of exchanges.  When --example is provided, we seed the list with a fixed
    // pair of well-known pools, otherwise we parse user supplied values (either plain exchange
    // names or exchange-pool pairs in the {exchange}-{pool_address} format).
    let exchanges: Vec<(String, Option<String>)> = if args.example {
        // You will need to port-forward tycho to run the example:
        //
        // ```bash
        // kubectl port-forward -n dev-tycho deploy/tycho-indexer 8888:4242
        // ```
        vec![
            (
                "uniswap_v3".to_string(),
                Some("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640".to_string()),
            ),
            (
                "uniswap_v2".to_string(),
                Some("0xa478c2975ab1ea89e8196811f51a7b7ade33eb11".to_string()),
            ),
        ]
    } else {
        args.exchange
            .iter()
            .filter_map(|e| {
                if e.contains('-') {
                    let parts: Vec<&str> = e.split('-').collect();
                    if parts.len() == 2 {
                        Some((parts[0].to_string(), Some(parts[1].to_string())))
                    } else {
                        warn!("Ignoring invalid exchange format: {}", e);
                        None
                    }
                } else {
                    Some((e.to_string(), None))
                }
            })
            .collect()
    };

    info!("Running with exchanges: {:?}", exchanges);

    run(exchanges, args).await?;
    Ok(())
}

async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) -> Result<(), String> {
    let blocklist = match &args.blocklist_config {
        Some(path) => load_blocklist(path)?,
        None => Vec::new(),
    };

    info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
    //TODO: remove "or args.auth_key.is_none()" when our internal client use the no_tls flag
    let (tycho_ws_url, tycho_rpc_url) = if args.no_tls || args.auth_key.is_none() {
        info!("Using non-secure connection: ws:// and http://");
        let tycho_ws_url = format!("ws://{url}", url = &args.tycho_url);
        let tycho_rpc_url = format!("http://{url}", url = &args.tycho_url);
        (tycho_ws_url, tycho_rpc_url)
    } else {
        info!("Using secure connection: wss:// and https://");
        let tycho_ws_url = format!("wss://{url}", url = &args.tycho_url);
        let tycho_rpc_url = format!("https://{url}", url = &args.tycho_url);
        (tycho_ws_url, tycho_rpc_url)
    };

    let ws_client = WsDeltasClient::new(&tycho_ws_url, args.auth_key.as_deref())
        .map_err(|e| format!("Failed to create WebSocket client: {e}"))?;
    let rpc_client = HttpRPCClient::new(
        &tycho_rpc_url,
        HttpRPCClientOptions::new()
            .with_auth_key(args.auth_key.clone())
            .with_compression(!args.disable_compression),
    )
    .map_err(|e| format!("Failed to create RPC client: {e}"))?;
    let chain = Chain::from_str(&args.chain)
        .map_err(|_| format!("Unknown chain: {chain}", chain = &args.chain))?;
    let ws_jh = ws_client
        .connect()
        .await
        .map_err(|e| format!("WebSocket client connection error: {e}"))?;

    let mut block_sync = BlockSynchronizer::new(
        Duration::from_secs(args.block_time),
        Duration::from_secs(args.timeout),
        args.max_missed_blocks,
    );

    if let Some(mm) = &args.max_messages {
        block_sync.max_messages(*mm);
    }

    let requested_protocol_set: HashSet<_> = exchanges
        .iter()
        .map(|(name, _)| name.clone())
        .collect();
    let protocol_info =
        ProtocolSystemsInfo::fetch(&rpc_client, chain, &requested_protocol_set).await;
    protocol_info.log_other_available();
    let dci_protocols = protocol_info.dci_protocols;

    for (name, address) in exchanges {
        debug!("Registering exchange: {}", name);
        let id = ExtractorIdentity { chain, name: name.clone() };
        let filter = if let Some(address) = address {
            ComponentFilter::Ids(vec![address])
        } else if let (Some(remove_tvl), Some(add_tvl)) =
            (args.remove_tvl_threshold, args.add_tvl_threshold)
        {
            ComponentFilter::with_tvl_range(remove_tvl, add_tvl)
        } else {
            ComponentFilter::with_tvl_range(args.min_tvl, args.min_tvl)
        }
        .blocklist(blocklist.clone());
        let uses_dci = dci_protocols.contains(&name);
        let sync = ProtocolStateSynchronizer::new(
            id.clone(),
            true,
            filter,
            args.max_retries,
            Duration::from_secs(args.block_time / 2),
            !args.no_state,
            args.include_tvl,
            !args.disable_compression,
            rpc_client.clone(),
            ws_client.clone(),
            args.block_time + args.timeout,
        )
        .with_dci(uses_dci)
        .with_partial_blocks(args.partial_blocks);
        block_sync = block_sync.register_synchronizer(id, sync);
    }

    let (sync_jh, mut rx) = block_sync
        .run()
        .await
        .map_err(|e| format!("Failed to start block synchronizer: {e}"))?;

    let msg_printer = tokio::spawn(async move {
        while let Some(result) = rx.recv().await {
            let msg =
                result.map_err(|e| format!("Message printer received synchronizer error: {e}"))?;

            if let Ok(msg_json) = serde_json::to_string(&msg) {
                println!("{msg_json}");
            } else {
                // Log the error but continue processing further messages.
                error!("Failed to serialize FeedMessage");
            };
        }

        Ok::<(), String>(())
    });

    // Monitor the WebSocket, BlockSynchronizer and message printer futures.
    let (failed_task, shutdown_reason) = tokio::select! {
        res = ws_jh => (
            "WebSocket",
            extract_nested_error(res)
        ),
        res = sync_jh => (
            "BlockSynchronizer",
            extract_nested_error::<_, _, String>(Ok(res))
            ),
        res = msg_printer => (
            "MessagePrinter",
            extract_nested_error(res)
        )
    };

    debug!("RX closed");
    Err(format!(
        "{failed_task} task terminated: {}",
        shutdown_reason.unwrap_or("unknown reason".to_string())
    ))
}

#[inline]
fn extract_nested_error<T, E1: ToString, E2: ToString>(
    res: Result<Result<T, E1>, E2>,
) -> Option<String> {
    res.map_err(|e| e.to_string())
        .and_then(|r| r.map_err(|e| e.to_string()))
        .err()
}

#[cfg(test)]
mod cli_tests {
    use clap::Parser;

    use super::CliArgs;

    #[tokio::test]
    async fn test_cli_args() {
        let args = CliArgs::parse_from([
            "tycho-client",
            "--tycho-url",
            "localhost:5000",
            "--exchange",
            "uniswap_v2",
            "--min-tvl",
            "3000",
            "--block-time",
            "50",
            "--timeout",
            "5",
            "--log-folder",
            "test_logs",
            "--example",
            "--max-messages",
            "1",
            "--blocklist-config",
            "blocklist.toml",
        ]);
        let exchanges: Vec<String> = vec!["uniswap_v2".to_string()];
        assert_eq!(args.tycho_url, "localhost:5000");
        assert_eq!(args.exchange, exchanges);
        assert_eq!(args.min_tvl, 3000.0);
        assert_eq!(args.block_time, 50);
        assert_eq!(args.timeout, 5);
        assert_eq!(args.log_folder, "test_logs");
        assert_eq!(args.max_messages, Some(1));
        assert!(args.example);
        assert_eq!(args.disable_compression, false);
        assert_eq!(args.partial_blocks, false);
        assert_eq!(args.blocklist_config, Some(std::path::PathBuf::from("blocklist.toml")));
    }
}