use std::time::Duration;
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use anyhow::anyhow;
use clap::Parser;
use fynd_rpc::{
builder::{parse_chain, FyndRPCBuilder},
config::{defaults, BlocklistConfig, WorkerPoolsConfig},
protocols::fetch_protocol_systems,
};
mod cli;
use cli::{Cli, Commands};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::trace::TracerProvider;
use thiserror::Error;
use tokio::{
select,
signal::unix::{signal, SignalKind},
};
use tracing::{error, info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use tycho_simulation::utils::default_blocklist;
fn main() -> Result<(), anyhow::Error> {
let cli = Cli::parse();
match cli.command {
Commands::Openapi => {
use utoipa::OpenApi as _;
let spec = fynd_rpc::api::ApiDoc::openapi();
let json = serde_json::to_string_pretty(&spec).expect("spec serialization cannot fail");
println!("{json}");
Ok(())
}
Commands::Serve(serve_args) => {
run_solver(*serve_args).map_err(|e| anyhow!("{}", e))?;
Ok(())
}
}
}
#[derive(Debug, Error)]
pub enum SolverError {
#[error("setup error: {0}")]
SetupError(String),
#[error("solver runtime error: {0}")]
SolverRuntimeError(String),
#[error("shutdown error: {0}")]
ShutdownError(String),
}
fn create_tracing_subscriber() -> Option<TracerProvider> {
let fmt_layer = tracing_subscriber::fmt::layer()
.with_target(true)
.compact();
if let Ok(endpoint) = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") {
match opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint.clone())
.build()
{
Ok(exporter) => {
let provider = TracerProvider::builder()
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
.with_resource(opentelemetry_sdk::Resource::new(vec![
opentelemetry::KeyValue::new("service.name", "fynd"),
]))
.build();
let otel_layer =
tracing_opentelemetry::layer().with_tracer(provider.tracer("fynd"));
tracing_subscriber::registry()
.with(EnvFilter::from_default_env())
.with(fmt_layer)
.with(otel_layer)
.init();
info!("OpenTelemetry tracing enabled, exporting to: {}", endpoint);
Some(provider)
}
Err(e) => {
tracing_subscriber::registry()
.with(EnvFilter::from_default_env())
.with(fmt_layer)
.init();
error!("Failed to build OTLP exporter: {}. Continuing without OTEL.", e);
None
}
}
} else {
tracing_subscriber::registry()
.with(EnvFilter::from_default_env())
.with(fmt_layer)
.init();
None
}
}
fn create_metrics_exporter() -> tokio::task::JoinHandle<()> {
let exporter_builder = PrometheusBuilder::new();
let handle = exporter_builder
.install_recorder()
.expect("Failed to install Prometheus recorder");
tokio::spawn(async move {
async fn metrics_handler(handle: PrometheusHandle) -> impl Responder {
let metrics = handle.render();
HttpResponse::Ok()
.content_type("text/plain; version=0.0.4; charset=utf-8")
.body(metrics)
}
if let Err(e) = HttpServer::new(move || {
App::new().route(
"/metrics",
web::get().to({
let handle = handle.clone();
move || metrics_handler(handle.clone())
}),
)
})
.bind(("0.0.0.0", 9898))
.expect("Failed to bind metrics server")
.run()
.await
{
error!("Metrics server failed: {}", e);
}
})
}
async fn setup_solver(args: &cli::ServeArgs) -> Result<fynd_rpc::builder::FyndRPC, SolverError> {
let default_path = std::path::Path::new("worker_pools.toml");
let pools_config =
if args.worker_pools_config == default_path && !args.worker_pools_config.exists() {
warn!(
"worker_pools.toml not found; using built-in defaults. \
Set --worker-pools-config or WORKER_POOLS_CONFIG to use a custom config."
);
WorkerPoolsConfig::builtin_default()
} else {
WorkerPoolsConfig::load_from_file(&args.worker_pools_config).map_err(|e| {
SolverError::SetupError(format!("failed to load worker pools config: {}", e))
})?
};
let chain = parse_chain(&args.chain)
.map_err(|e| SolverError::SetupError(format!("failed to parse chain: {}", e)))?;
let tycho_url = match &args.tycho_url {
Some(url) => url.clone(),
None => {
let default =
defaults::default_tycho_url(&args.chain).map_err(SolverError::SetupError)?;
info!("No --tycho-url provided. Using default for {}: {}", args.chain, default);
default.to_string()
}
};
let rpc_url = match &args.rpc_url {
Some(url) => url.clone(),
None => {
warn!(
"No --rpc-url provided. Using public endpoint: {}. \
For production use, provide a dedicated RPC endpoint.",
defaults::DEFAULT_RPC_URL
);
defaults::DEFAULT_RPC_URL.to_string()
}
};
let needs_fetch = args.protocols.is_empty() ||
args.protocols
.iter()
.any(|p| p == "all_onchain");
let protocols = if needs_fetch {
let mut fetched = fetch_protocol_systems(
&tycho_url,
args.tycho_api_key.as_deref(),
!args.disable_tls,
chain,
)
.await
.map_err(|e| SolverError::SetupError(format!("failed to fetch protocol systems: {}", e)))?;
for p in &args.protocols {
if p != "all_onchain" && !fetched.contains(p) {
fetched.push(p.clone());
}
}
fetched
} else {
args.protocols.clone()
};
if protocols.is_empty() {
return Err(SolverError::SetupError(
"no supported protocols found. Provide --protocols or check Tycho connectivity."
.to_string(),
));
}
info!(?protocols, "starting with {} protocol(s)", protocols.len());
let mut builder =
FyndRPCBuilder::new(chain, pools_config.into_pools(), tycho_url, rpc_url, protocols)
.http_host(args.http_host.clone())
.http_port(args.http_port)
.min_tvl(args.min_tvl)
.min_token_quality(args.min_token_quality)
.traded_n_days_ago(args.traded_n_days_ago)
.tvl_buffer_ratio(args.tvl_buffer_ratio)
.gas_refresh_interval(Duration::from_secs(args.gas_refresh_interval_secs))
.reconnect_delay(Duration::from_secs(args.reconnect_delay_secs))
.worker_router_timeout(Duration::from_millis(args.worker_router_timeout_ms))
.worker_router_min_responses(args.worker_router_min_responses)
.gas_price_stale_threshold(
args.gas_price_stale_threshold_secs
.map(Duration::from_secs),
);
if args.disable_tls {
builder = builder.disable_tls();
}
if let Some(api_key) = &args.tycho_api_key {
builder = builder.tycho_api_key(api_key.clone());
}
let blocklist = match &args.blocklist_config {
Some(path) => BlocklistConfig::load_from_file(path)
.map_err(|e| {
SolverError::SetupError(format!("failed to load blocklist config: {}", e))
})?
.into_components(),
None => default_blocklist(),
};
builder = builder.blocklist(blocklist);
builder = builder.with_price_guard_config(
fynd_core::PriceGuardConfig::default()
.with_enabled(args.enable_price_guard)
.with_lower_tolerance_bps(args.price_guard_lower_tolerance_bps)
.with_upper_tolerance_bps(args.price_guard_upper_tolerance_bps)
.with_fail_on_provider_error(args.price_guard_fail_on_provider_error)
.with_fail_on_token_price_not_found(args.price_guard_fail_on_token_price_not_found),
);
let solver = builder
.build()
.map_err(|e| SolverError::SetupError(format!("failed to start solver: {}", e)))?;
Ok(solver)
}
#[tokio::main]
async fn run_solver(args: cli::ServeArgs) -> Result<(), SolverError> {
let provider = create_tracing_subscriber();
info!("Starting Fynd");
let _metrics_task = create_metrics_exporter();
let solver = tokio::select! {
result = setup_solver(&args) => result?,
_ = tokio::signal::ctrl_c() => {
info!("SIGINT received during setup. Exiting.");
return Ok(());
}
};
let server_handle = solver.server_handle();
let shutdown_signal = tokio::spawn(async move {
let ctrl_c = tokio::signal::ctrl_c();
let mut sigterm = match signal(SignalKind::terminate()) {
Ok(sig) => sig,
Err(e) => {
error!("Failed to register SIGTERM handler: {}", e);
return Err(SolverError::SetupError(format!(
"failed to register signal handler: {}",
e
)));
}
};
select! {
_ = ctrl_c => {
info!("SIGINT (Ctrl+C) received. Stopping server...");
}
_ = sigterm.recv() => {
info!("SIGTERM received. Stopping server...");
}
}
server_handle.stop(true).await;
Ok::<(), SolverError>(())
});
select! {
result = solver.run() => {
if let Err(e) = result {
return Err(SolverError::SolverRuntimeError(e.to_string()));
}
}
result = shutdown_signal => {
if let Err(e) = result {
return Err(SolverError::ShutdownError(e.to_string()));
}
}
}
if let Some(provider) = provider {
let _ = provider.shutdown();
}
Ok(())
}