use std::{collections::HashSet, path::Path, str::FromStr, time::Duration};
use clap::Parser;
use tracing::{debug, error, info, warn};
use tracing_appender::rolling;
use tycho_common::dto::{Chain, ExtractorIdentity};
use crate::{
deltas::DeltasClient,
feed::{
component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
BlockSynchronizer,
},
rpc::HttpRPCClientOptions,
stream::ProtocolSystemsInfo,
HttpRPCClient, WsDeltasClient,
};
#[derive(Parser, Debug, Clone, PartialEq)]
#[clap(version = env!("CARGO_PKG_VERSION"))]
struct CliArgs {
#[clap(long, default_value = "localhost:4242", env = "TYCHO_URL")]
tycho_url: String,
#[clap(short = 'k', long, env = "TYCHO_AUTH_TOKEN")]
auth_key: Option<String>,
#[clap(long)]
no_tls: bool,
#[clap(short = 'c', long, default_value = "ethereum")]
pub chain: String,
#[clap(short = 'e', long, number_of_values = 1)]
exchange: Vec<String>,
#[clap(long, default_value = "10")]
min_tvl: f64,
#[clap(long)]
remove_tvl_threshold: Option<f64>,
#[clap(long)]
add_tvl_threshold: Option<f64>,
#[clap(long, default_value = "600")]
block_time: u64,
#[clap(long, default_value = "1")]
timeout: u64,
#[clap(long, default_value = "logs")]
log_folder: String,
#[clap(long)]
example: bool,
#[clap(long)]
no_state: bool,
#[clap(short='n', long, default_value=None)]
max_messages: Option<usize>,
#[clap(long, default_value = "10")]
max_missed_blocks: u64,
#[clap(long)]
include_tvl: bool,
#[clap(long)]
disable_compression: bool,
#[clap(long)]
partial_blocks: bool,
#[clap(long)]
verbose: bool,
#[clap(long, default_value = "32")]
max_retries: u64,
#[clap(long)]
blocklist_config: Option<std::path::PathBuf>,
}
impl CliArgs {
fn validate(&self) -> Result<(), String> {
match (self.remove_tvl_threshold, self.add_tvl_threshold) {
(Some(remove), Some(add)) if remove >= add => {
return Err("remove_tvl_threshold must be less than add_tvl_threshold".to_string());
}
(Some(_), None) | (None, Some(_)) => {
return Err(
"Both remove_tvl_threshold and add_tvl_threshold must be set.".to_string()
);
}
_ => {}
}
Ok(())
}
}
#[derive(serde::Deserialize)]
struct BlocklistFile {
ids: Vec<String>,
}
fn load_blocklist(path: &Path) -> Result<Vec<String>, String> {
let content = std::fs::read_to_string(path)
.map_err(|e| format!("Failed to read blocklist file {}: {e}", path.display()))?;
let file: BlocklistFile = toml::from_str(&content)
.map_err(|e| format!("Failed to parse blocklist file {}: {e}", path.display()))?;
Ok(file.ids)
}
pub async fn run_cli() -> Result<(), String> {
let args: CliArgs = CliArgs::parse();
args.validate()?;
let log_level = if args.verbose { "debug" } else { "info" };
let (non_blocking, _guard) =
tracing_appender::non_blocking(rolling::never(&args.log_folder, "dev_logs.log"));
let subscriber = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level)),
)
.with_writer(non_blocking)
.finish();
tracing::subscriber::set_global_default(subscriber)
.map_err(|e| format!("Failed to set up logging subscriber: {e}"))?;
let exchanges: Vec<(String, Option<String>)> = if args.example {
vec![
(
"uniswap_v3".to_string(),
Some("0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640".to_string()),
),
(
"uniswap_v2".to_string(),
Some("0xa478c2975ab1ea89e8196811f51a7b7ade33eb11".to_string()),
),
]
} else {
args.exchange
.iter()
.filter_map(|e| {
if e.contains('-') {
let parts: Vec<&str> = e.split('-').collect();
if parts.len() == 2 {
Some((parts[0].to_string(), Some(parts[1].to_string())))
} else {
warn!("Ignoring invalid exchange format: {}", e);
None
}
} else {
Some((e.to_string(), None))
}
})
.collect()
};
info!("Running with exchanges: {:?}", exchanges);
run(exchanges, args).await?;
Ok(())
}
async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) -> Result<(), String> {
let blocklist = match &args.blocklist_config {
Some(path) => load_blocklist(path)?,
None => Vec::new(),
};
info!("Running with version: {}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"));
let (tycho_ws_url, tycho_rpc_url) = if args.no_tls || args.auth_key.is_none() {
info!("Using non-secure connection: ws:// and http://");
let tycho_ws_url = format!("ws://{url}", url = &args.tycho_url);
let tycho_rpc_url = format!("http://{url}", url = &args.tycho_url);
(tycho_ws_url, tycho_rpc_url)
} else {
info!("Using secure connection: wss:// and https://");
let tycho_ws_url = format!("wss://{url}", url = &args.tycho_url);
let tycho_rpc_url = format!("https://{url}", url = &args.tycho_url);
(tycho_ws_url, tycho_rpc_url)
};
let ws_client = WsDeltasClient::new(&tycho_ws_url, args.auth_key.as_deref())
.map_err(|e| format!("Failed to create WebSocket client: {e}"))?;
let rpc_client = HttpRPCClient::new(
&tycho_rpc_url,
HttpRPCClientOptions::new()
.with_auth_key(args.auth_key.clone())
.with_compression(!args.disable_compression),
)
.map_err(|e| format!("Failed to create RPC client: {e}"))?;
let chain = Chain::from_str(&args.chain)
.map_err(|_| format!("Unknown chain: {chain}", chain = &args.chain))?;
let ws_jh = ws_client
.connect()
.await
.map_err(|e| format!("WebSocket client connection error: {e}"))?;
let mut block_sync = BlockSynchronizer::new(
Duration::from_secs(args.block_time),
Duration::from_secs(args.timeout),
args.max_missed_blocks,
);
if let Some(mm) = &args.max_messages {
block_sync.max_messages(*mm);
}
let requested_protocol_set: HashSet<_> = exchanges
.iter()
.map(|(name, _)| name.clone())
.collect();
let protocol_info =
ProtocolSystemsInfo::fetch(&rpc_client, chain, &requested_protocol_set).await;
protocol_info.log_other_available();
let dci_protocols = protocol_info.dci_protocols;
for (name, address) in exchanges {
debug!("Registering exchange: {}", name);
let id = ExtractorIdentity { chain, name: name.clone() };
let filter = if let Some(address) = address {
ComponentFilter::Ids(vec![address])
} else if let (Some(remove_tvl), Some(add_tvl)) =
(args.remove_tvl_threshold, args.add_tvl_threshold)
{
ComponentFilter::with_tvl_range(remove_tvl, add_tvl)
} else {
ComponentFilter::with_tvl_range(args.min_tvl, args.min_tvl)
}
.blocklist(blocklist.clone());
let uses_dci = dci_protocols.contains(&name);
let sync = ProtocolStateSynchronizer::new(
id.clone(),
true,
filter,
args.max_retries,
Duration::from_secs(args.block_time / 2),
!args.no_state,
args.include_tvl,
!args.disable_compression,
rpc_client.clone(),
ws_client.clone(),
args.block_time + args.timeout,
)
.with_dci(uses_dci)
.with_partial_blocks(args.partial_blocks);
block_sync = block_sync.register_synchronizer(id, sync);
}
let (sync_jh, mut rx) = block_sync
.run()
.await
.map_err(|e| format!("Failed to start block synchronizer: {e}"))?;
let msg_printer = tokio::spawn(async move {
while let Some(result) = rx.recv().await {
let msg =
result.map_err(|e| format!("Message printer received synchronizer error: {e}"))?;
if let Ok(msg_json) = serde_json::to_string(&msg) {
println!("{msg_json}");
} else {
error!("Failed to serialize FeedMessage");
};
}
Ok::<(), String>(())
});
let (failed_task, shutdown_reason) = tokio::select! {
res = ws_jh => (
"WebSocket",
extract_nested_error(res)
),
res = sync_jh => (
"BlockSynchronizer",
extract_nested_error::<_, _, String>(Ok(res))
),
res = msg_printer => (
"MessagePrinter",
extract_nested_error(res)
)
};
debug!("RX closed");
Err(format!(
"{failed_task} task terminated: {}",
shutdown_reason.unwrap_or("unknown reason".to_string())
))
}
#[inline]
fn extract_nested_error<T, E1: ToString, E2: ToString>(
res: Result<Result<T, E1>, E2>,
) -> Option<String> {
res.map_err(|e| e.to_string())
.and_then(|r| r.map_err(|e| e.to_string()))
.err()
}
#[cfg(test)]
mod cli_tests {
use clap::Parser;
use super::CliArgs;
#[tokio::test]
async fn test_cli_args() {
let args = CliArgs::parse_from([
"tycho-client",
"--tycho-url",
"localhost:5000",
"--exchange",
"uniswap_v2",
"--min-tvl",
"3000",
"--block-time",
"50",
"--timeout",
"5",
"--log-folder",
"test_logs",
"--example",
"--max-messages",
"1",
"--blocklist-config",
"blocklist.toml",
]);
let exchanges: Vec<String> = vec!["uniswap_v2".to_string()];
assert_eq!(args.tycho_url, "localhost:5000");
assert_eq!(args.exchange, exchanges);
assert_eq!(args.min_tvl, 3000.0);
assert_eq!(args.block_time, 50);
assert_eq!(args.timeout, 5);
assert_eq!(args.log_folder, "test_logs");
assert_eq!(args.max_messages, Some(1));
assert!(args.example);
assert_eq!(args.disable_compression, false);
assert_eq!(args.partial_blocks, false);
assert_eq!(args.blocklist_config, Some(std::path::PathBuf::from("blocklist.toml")));
}
}