use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use actix_web::{dev::ServerHandle, App, HttpServer};
use anyhow::{Context, Result};
use fynd_core::{encoding::encoder::Encoder, worker_pool::pool::WorkerPool, FyndBuilder};
use tokio::task::JoinHandle;
use tracing::{error, info, warn};
use tycho_simulation::tycho_common::models::Chain;
use crate::{
api::{configure_app, AppState, HealthTracker},
config::{defaults, PoolConfig},
};
#[must_use]
pub struct FyndRPCBuilder {
fynd_builder: FyndBuilder,
http_host: String,
http_port: u16,
gas_price_stale_threshold: Option<Duration>,
}
impl FyndRPCBuilder {
pub fn new(
chain: Chain,
pools: HashMap<String, PoolConfig>,
tycho_url: String,
rpc_url: String,
protocols: Vec<String>,
) -> Self {
let fynd_builder = pools
.iter()
.fold(
FyndBuilder::new(chain, tycho_url, rpc_url, protocols, defaults::MIN_TVL),
|sb, (name, cfg)| sb.add_pool(name, cfg),
)
.worker_router_timeout(Duration::from_millis(defaults::WORKER_ROUTER_TIMEOUT_MS));
Self {
fynd_builder,
http_host: defaults::HTTP_HOST.to_owned(),
http_port: defaults::HTTP_PORT,
gas_price_stale_threshold: None,
}
}
pub fn http_host(mut self, host: String) -> Self {
self.http_host = host;
self
}
pub fn http_port(mut self, port: u16) -> Self {
self.http_port = port;
self
}
pub fn min_tvl(mut self, min_tvl: f64) -> Self {
self.fynd_builder = self.fynd_builder.min_tvl(min_tvl);
self
}
pub fn min_token_quality(mut self, quality: i32) -> Self {
self.fynd_builder = self
.fynd_builder
.min_token_quality(quality);
self
}
pub fn traded_n_days_ago(mut self, days: u64) -> Self {
self.fynd_builder = self
.fynd_builder
.traded_n_days_ago(days);
self
}
pub fn tvl_buffer_ratio(mut self, ratio: f64) -> Self {
self.fynd_builder = self
.fynd_builder
.tvl_buffer_ratio(ratio);
self
}
pub fn gas_refresh_interval(mut self, interval: Duration) -> Self {
self.fynd_builder = self
.fynd_builder
.gas_refresh_interval(interval);
self
}
pub fn reconnect_delay(mut self, delay: Duration) -> Self {
self.fynd_builder = self.fynd_builder.reconnect_delay(delay);
self
}
pub fn worker_router_timeout(mut self, timeout: Duration) -> Self {
self.fynd_builder = self
.fynd_builder
.worker_router_timeout(timeout);
self
}
pub fn worker_router_min_responses(mut self, min: usize) -> Self {
self.fynd_builder = self
.fynd_builder
.worker_router_min_responses(min);
self
}
pub fn tycho_api_key(mut self, key: String) -> Self {
self.fynd_builder = self.fynd_builder.tycho_api_key(key);
self
}
pub fn disable_tls(mut self) -> Self {
self.fynd_builder = self.fynd_builder.tycho_use_tls(false);
self
}
pub fn blocklist(mut self, blocklist: HashSet<String>) -> Self {
self.fynd_builder = self
.fynd_builder
.blocklisted_components(blocklist);
self
}
pub fn encoder(mut self, encoder: Encoder) -> Self {
self.fynd_builder = self.fynd_builder.encoder(encoder);
self
}
pub fn gas_price_stale_threshold(mut self, threshold: Option<Duration>) -> Self {
self.gas_price_stale_threshold = threshold;
self
}
pub fn price_guard_enabled(mut self, enabled: bool) -> Self {
self.fynd_builder = self
.fynd_builder
.price_guard_enabled(enabled);
self
}
pub fn build(self) -> Result<FyndRPC> {
info!(
host = %self.http_host,
port = self.http_port,
"starting fynd"
);
let parts = self
.fynd_builder
.build()
.map_err(|e| anyhow::anyhow!("{}", e))?
.into_parts();
for pool in parts.worker_pools() {
info!(
name = %pool.name(),
algorithm = %pool.algorithm(),
num_workers = pool.num_workers(),
"worker pool started"
);
}
let chain = parts.chain();
let chain_id = chain.id();
let router_address = parts.router_address().clone();
let permit2_address = {
use fynd_core::encoding::encoder::PERMIT2_ADDRESS;
let hex = PERMIT2_ADDRESS
.strip_prefix("0x")
.unwrap_or(PERMIT2_ADDRESS);
hex::decode(hex)
.context("failed to decode PERMIT2_ADDRESS")?
.into()
};
let health_tracker =
HealthTracker::new(Arc::clone(parts.market_data()), Arc::clone(parts.derived_data()))
.with_gas_price_stale_threshold(self.gas_price_stale_threshold);
#[cfg(feature = "experimental")]
let gas_token = {
use fynd_core::types::constants::native_token;
native_token(&chain).context("gas token not configured for chain")?
};
let (
router,
worker_pools,
_market_data,
_derived_data,
feed_handle,
gas_price_handle,
computation_handle,
computation_shutdown_tx,
) = parts.into_components();
let app_state = AppState::new(
router,
health_tracker,
chain_id,
router_address,
permit2_address,
#[cfg(feature = "experimental")]
Arc::clone(&_derived_data),
#[cfg(feature = "experimental")]
gas_token,
);
let server = HttpServer::new(move || {
App::new()
.wrap(tracing_actix_web::TracingLogger::default())
.configure(|cfg| configure_app(cfg, app_state.clone()))
})
.bind((self.http_host.as_str(), self.http_port))
.context("failed to bind HTTP server")?
.run();
let server_handle = server.handle();
let server_task = tokio::spawn(async move {
if let Err(e) = server.await {
tracing::error!(error = %e, "HTTP server error");
}
});
Ok(FyndRPC {
server_handle,
server_task,
worker_pools,
feed_handle,
gas_price_worker_handle: gas_price_handle,
computation_manager_handle: computation_handle,
computation_shutdown_tx,
})
}
}
#[must_use]
pub struct FyndRPC {
server_handle: ServerHandle,
server_task: JoinHandle<()>,
worker_pools: Vec<WorkerPool>,
feed_handle: JoinHandle<()>,
gas_price_worker_handle: JoinHandle<()>,
computation_manager_handle: JoinHandle<()>,
computation_shutdown_tx: tokio::sync::broadcast::Sender<()>,
}
impl FyndRPC {
pub fn server_handle(&self) -> ServerHandle {
self.server_handle.clone()
}
pub async fn run(self) -> std::io::Result<()> {
let FyndRPC {
server_handle,
mut server_task,
worker_pools,
mut feed_handle,
mut gas_price_worker_handle,
mut computation_manager_handle,
computation_shutdown_tx,
} = self;
info!("HTTP server started");
tokio::select! {
server_result = &mut server_task => {
if let Err(e) = server_result {
error!(error = %e, "Server task error");
}
info!("shutting down: HTTP server stopped, aborting feed and computation");
feed_handle.abort();
gas_price_worker_handle.abort();
let _ = computation_shutdown_tx.send(());
computation_manager_handle.abort();
}
_ = &mut feed_handle => {
error!("Tycho feed error detected, shutting down solver");
server_handle.stop(true).await;
server_task.await.ok();
gas_price_worker_handle.abort();
let _ = computation_shutdown_tx.send(());
computation_manager_handle.abort();
info!("shutting down: feed error path");
}
_ = &mut gas_price_worker_handle => {
error!("Gas price worker error detected, shutting down solver");
server_handle.stop(true).await;
server_task.await.ok();
feed_handle.abort();
let _ = computation_shutdown_tx.send(());
computation_manager_handle.abort();
info!("shutting down: gas price error path");
}
_ = &mut computation_manager_handle => {
warn!("Computation manager stopped unexpectedly");
}
}
info!("shutting down worker pools");
for pool in worker_pools {
let name = pool.name().to_owned();
info!(name, "shutting down pool");
pool.shutdown();
info!(name, "pool shut down");
}
info!("shutdown complete");
Ok(())
}
}
pub fn parse_chain(chain: &str) -> Result<Chain> {
let candidate = format!("\"{}\"", chain.to_ascii_lowercase());
serde_json::from_str::<Chain>(&candidate)
.map_err(|_| anyhow::anyhow!("unsupported chain '{}'. Try values like 'Ethereum'", chain))
}