use std::{collections::HashSet, sync::Arc, time::Duration};
use num_cpus;
use serde::{Deserialize, Serialize};
use tokio::{sync::broadcast, task::JoinHandle};
use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
use tycho_simulation::{
tycho_common::{models::Chain, Bytes},
tycho_ethereum::rpc::EthereumRpcClient,
};
use crate::{
algorithm::{AlgorithmConfig, AlgorithmError},
derived::{ComputationManager, ComputationManagerConfig, SharedDerivedDataRef},
encoding::encoder::Encoder,
feed::{
events::MarketEventHandler,
gas::GasPriceFetcher,
market_data::{SharedMarketData, SharedMarketDataRef},
tycho_feed::TychoFeed,
TychoFeedConfig,
},
graph::EdgeWeightUpdaterWithDerived,
price_guard::{
guard::PriceGuard, provider::PriceProvider, provider_registry::PriceProviderRegistry,
},
types::constants::native_token,
worker_pool::{
pool::{WorkerPool, WorkerPoolBuilder},
registry::UnknownAlgorithmError,
},
worker_pool_router::{config::WorkerPoolRouterConfig, SolverPoolHandle, WorkerPoolRouter},
Algorithm, Quote, QuoteRequest, SolveError,
};
pub mod defaults {
use std::time::Duration;
pub const MIN_TOKEN_QUALITY: i32 = 100;
pub const TRADED_N_DAYS_AGO: u64 = 3;
pub const TVL_BUFFER_RATIO: f64 = 1.1;
pub const GAS_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
pub const RECONNECT_DELAY: Duration = Duration::from_secs(5);
pub const ROUTER_MIN_RESPONSES: usize = 0;
pub const POOL_TASK_QUEUE_CAPACITY: usize = 1000;
pub const POOL_MIN_HOPS: usize = 1;
pub const POOL_MAX_HOPS: usize = 3;
pub const POOL_TIMEOUT_MS: u64 = 100;
}
const DEFAULT_TYCHO_USE_TLS: bool = true;
const DEFAULT_DEPTH_SLIPPAGE_THRESHOLD: f64 = 0.01;
const DEFAULT_ROUTER_TIMEOUT: Duration = Duration::from_secs(10);
fn default_task_queue_capacity() -> usize {
defaults::POOL_TASK_QUEUE_CAPACITY
}
fn default_min_hops() -> usize {
defaults::POOL_MIN_HOPS
}
fn default_max_hops() -> usize {
defaults::POOL_MAX_HOPS
}
fn default_algo_timeout_ms() -> u64 {
defaults::POOL_TIMEOUT_MS
}
#[must_use]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoolConfig {
algorithm: String,
#[serde(default = "num_cpus::get")]
num_workers: usize,
#[serde(default = "default_task_queue_capacity")]
task_queue_capacity: usize,
#[serde(default = "default_min_hops")]
min_hops: usize,
#[serde(default = "default_max_hops")]
max_hops: usize,
#[serde(default = "default_algo_timeout_ms")]
timeout_ms: u64,
#[serde(default)]
max_routes: Option<usize>,
}
impl PoolConfig {
pub fn new(algorithm: impl Into<String>) -> Self {
Self {
algorithm: algorithm.into(),
num_workers: num_cpus::get(),
task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
min_hops: defaults::POOL_MIN_HOPS,
max_hops: defaults::POOL_MAX_HOPS,
timeout_ms: defaults::POOL_TIMEOUT_MS,
max_routes: None,
}
}
pub fn algorithm(&self) -> &str {
&self.algorithm
}
pub fn num_workers(&self) -> usize {
self.num_workers
}
pub fn with_num_workers(mut self, num_workers: usize) -> Self {
self.num_workers = num_workers;
self
}
pub fn with_task_queue_capacity(mut self, task_queue_capacity: usize) -> Self {
self.task_queue_capacity = task_queue_capacity;
self
}
pub fn with_min_hops(mut self, min_hops: usize) -> Self {
self.min_hops = min_hops;
self
}
pub fn with_max_hops(mut self, max_hops: usize) -> Self {
self.max_hops = max_hops;
self
}
pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = timeout_ms;
self
}
pub fn with_max_routes(mut self, max_routes: Option<usize>) -> Self {
self.max_routes = max_routes;
self
}
pub fn task_queue_capacity(&self) -> usize {
self.task_queue_capacity
}
pub fn min_hops(&self) -> usize {
self.min_hops
}
pub fn max_hops(&self) -> usize {
self.max_hops
}
pub fn timeout_ms(&self) -> u64 {
self.timeout_ms
}
pub fn max_routes(&self) -> Option<usize> {
self.max_routes
}
}
#[derive(Debug, thiserror::Error)]
#[error("timed out after {timeout_ms}ms waiting for market data and derived computations")]
pub struct WaitReadyError {
timeout_ms: u64,
}
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum SolverBuildError {
#[error("failed to create ethereum RPC client: {0}")]
RpcClient(String),
#[error(transparent)]
AlgorithmConfig(#[from] AlgorithmError),
#[error("failed to create computation manager: {0}")]
ComputationManager(String),
#[error("failed to create encoder: {0}")]
Encoder(String),
#[error(transparent)]
UnknownAlgorithm(#[from] UnknownAlgorithmError),
#[error("gas token not configured for chain")]
GasToken,
#[error("no worker pools configured")]
NoPools,
}
enum PoolEntry {
BuiltIn {
name: String,
algorithm: String,
num_workers: usize,
task_queue_capacity: usize,
min_hops: usize,
max_hops: usize,
timeout_ms: u64,
max_routes: Option<usize>,
},
Custom(CustomPoolEntry),
}
struct CustomPoolEntry {
name: String,
num_workers: usize,
task_queue_capacity: usize,
min_hops: usize,
max_hops: usize,
timeout_ms: u64,
max_routes: Option<usize>,
configure: Box<dyn FnOnce(WorkerPoolBuilder) -> WorkerPoolBuilder + Send>,
}
#[must_use = "a builder does nothing until .build() is called"]
pub struct FyndBuilder {
chain: Chain,
tycho_url: String,
rpc_url: String,
protocols: Vec<String>,
min_tvl: f64,
tycho_api_key: Option<String>,
tycho_use_tls: bool,
min_token_quality: i32,
traded_n_days_ago: u64,
tvl_buffer_ratio: f64,
gas_refresh_interval: Duration,
reconnect_delay: Duration,
blocklisted_components: HashSet<String>,
router_timeout: Duration,
router_min_responses: usize,
encoder: Option<Encoder>,
pools: Vec<PoolEntry>,
price_guard_enabled: bool,
price_providers: Vec<Box<dyn PriceProvider>>,
}
impl FyndBuilder {
pub fn new(
chain: Chain,
tycho_url: impl Into<String>,
rpc_url: impl Into<String>,
protocols: Vec<String>,
min_tvl: f64,
) -> Self {
Self {
chain,
tycho_url: tycho_url.into(),
rpc_url: rpc_url.into(),
protocols,
min_tvl,
tycho_api_key: None,
tycho_use_tls: DEFAULT_TYCHO_USE_TLS,
min_token_quality: defaults::MIN_TOKEN_QUALITY,
traded_n_days_ago: defaults::TRADED_N_DAYS_AGO,
tvl_buffer_ratio: defaults::TVL_BUFFER_RATIO,
gas_refresh_interval: defaults::GAS_REFRESH_INTERVAL,
reconnect_delay: defaults::RECONNECT_DELAY,
blocklisted_components: HashSet::new(),
router_timeout: DEFAULT_ROUTER_TIMEOUT,
router_min_responses: defaults::ROUTER_MIN_RESPONSES,
encoder: None,
pools: Vec::new(),
price_guard_enabled: false,
price_providers: Vec::new(),
}
}
pub fn chain(&self) -> Chain {
self.chain
}
pub fn tycho_api_key(mut self, key: impl Into<String>) -> Self {
self.tycho_api_key = Some(key.into());
self
}
pub fn min_tvl(mut self, min_tvl: f64) -> Self {
self.min_tvl = min_tvl;
self
}
pub fn tycho_use_tls(mut self, use_tls: bool) -> Self {
self.tycho_use_tls = use_tls;
self
}
pub fn min_token_quality(mut self, quality: i32) -> Self {
self.min_token_quality = quality;
self
}
pub fn traded_n_days_ago(mut self, days: u64) -> Self {
self.traded_n_days_ago = days;
self
}
pub fn tvl_buffer_ratio(mut self, ratio: f64) -> Self {
self.tvl_buffer_ratio = ratio;
self
}
pub fn gas_refresh_interval(mut self, interval: Duration) -> Self {
self.gas_refresh_interval = interval;
self
}
pub fn reconnect_delay(mut self, delay: Duration) -> Self {
self.reconnect_delay = delay;
self
}
pub fn blocklisted_components(mut self, components: HashSet<String>) -> Self {
self.blocklisted_components = components;
self
}
pub fn worker_router_timeout(mut self, timeout: Duration) -> Self {
self.router_timeout = timeout;
self
}
pub fn worker_router_min_responses(mut self, min: usize) -> Self {
self.router_min_responses = min;
self
}
pub fn encoder(mut self, encoder: Encoder) -> Self {
self.encoder = Some(encoder);
self
}
pub fn algorithm(mut self, algorithm: impl Into<String>) -> Self {
self.pools.push(PoolEntry::BuiltIn {
name: "default".to_string(),
algorithm: algorithm.into(),
num_workers: num_cpus::get(),
task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
min_hops: defaults::POOL_MIN_HOPS,
max_hops: defaults::POOL_MAX_HOPS,
timeout_ms: defaults::POOL_TIMEOUT_MS,
max_routes: None,
});
self
}
pub fn with_algorithm<A, F>(mut self, name: impl Into<String>, factory: F) -> Self
where
A: Algorithm + 'static,
A::GraphManager: MarketEventHandler + EdgeWeightUpdaterWithDerived + 'static,
F: Fn(AlgorithmConfig) -> A + Clone + Send + Sync + 'static,
{
let name = name.into();
let algo_name = name.clone();
let configure =
Box::new(move |builder: WorkerPoolBuilder| builder.with_algorithm(algo_name, factory));
self.pools
.push(PoolEntry::Custom(CustomPoolEntry {
name,
num_workers: num_cpus::get(),
task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
min_hops: defaults::POOL_MIN_HOPS,
max_hops: defaults::POOL_MAX_HOPS,
timeout_ms: defaults::POOL_TIMEOUT_MS,
max_routes: None,
configure,
}));
self
}
pub fn add_default_price_providers(self) -> Self {
self.register_price_provider(Box::new(
crate::price_guard::hyperliquid::HyperliquidProvider::default(),
))
.register_price_provider(Box::new(
crate::price_guard::binance_ws::BinanceWsProvider::default(),
))
}
pub fn register_price_provider(mut self, provider: Box<dyn PriceProvider>) -> Self {
self.price_providers.push(provider);
self
}
pub fn price_guard_enabled(mut self, enabled: bool) -> Self {
self.price_guard_enabled = enabled;
self
}
pub fn add_pool(mut self, name: impl Into<String>, config: &PoolConfig) -> Self {
self.pools.push(PoolEntry::BuiltIn {
name: name.into(),
algorithm: config.algorithm().to_string(),
num_workers: config.num_workers(),
task_queue_capacity: config.task_queue_capacity(),
min_hops: config.min_hops(),
max_hops: config.max_hops(),
timeout_ms: config.timeout_ms(),
max_routes: config.max_routes(),
});
self
}
pub fn build(mut self) -> Result<Solver, SolverBuildError> {
if self.pools.is_empty() {
return Err(SolverBuildError::NoPools);
}
if self.price_providers.is_empty() {
self = self.add_default_price_providers();
}
let market_data = Arc::new(tokio::sync::RwLock::new(SharedMarketData::new()));
let tycho_feed_config = TychoFeedConfig::new(
self.tycho_url,
self.chain,
self.tycho_api_key,
self.tycho_use_tls,
self.protocols,
self.min_tvl,
)
.tvl_buffer_ratio(self.tvl_buffer_ratio)
.gas_refresh_interval(self.gas_refresh_interval)
.reconnect_delay(self.reconnect_delay)
.min_token_quality(self.min_token_quality)
.traded_n_days_ago(self.traded_n_days_ago)
.blocklisted_components(self.blocklisted_components);
let ethereum_client = EthereumRpcClient::new(self.rpc_url.as_str())
.map_err(|e| SolverBuildError::RpcClient(e.to_string()))?;
let (mut gas_price_fetcher, gas_price_worker_signal_tx) =
GasPriceFetcher::new(ethereum_client, Arc::clone(&market_data));
let mut tycho_feed = TychoFeed::new(tycho_feed_config, Arc::clone(&market_data));
tycho_feed = tycho_feed.with_gas_price_worker_signal_tx(gas_price_worker_signal_tx);
let gas_token = native_token(&self.chain).map_err(|_| SolverBuildError::GasToken)?;
let computation_config = ComputationManagerConfig::new()
.with_gas_token(gas_token)
.with_depth_slippage_threshold(DEFAULT_DEPTH_SLIPPAGE_THRESHOLD);
let (computation_manager, _) =
ComputationManager::new(computation_config, Arc::clone(&market_data))
.map_err(|e| SolverBuildError::ComputationManager(e.to_string()))?;
let derived_data: SharedDerivedDataRef = computation_manager.store();
let derived_event_tx = computation_manager.event_sender();
let computation_event_rx = tycho_feed.subscribe();
let (computation_shutdown_tx, computation_shutdown_rx) = broadcast::channel(1);
let mut solver_pool_handles: Vec<SolverPoolHandle> = Vec::new();
let mut worker_pools: Vec<WorkerPool> = Vec::new();
for pool_entry in self.pools {
let pool_event_rx = tycho_feed.subscribe();
let derived_rx = derived_event_tx.subscribe();
let (worker_pool, task_handle) = match pool_entry {
PoolEntry::BuiltIn {
name,
algorithm,
num_workers,
task_queue_capacity,
min_hops,
max_hops,
timeout_ms,
max_routes,
} => {
let algo_cfg = AlgorithmConfig::new(
min_hops,
max_hops,
Duration::from_millis(timeout_ms),
max_routes,
)?;
WorkerPoolBuilder::new()
.name(name)
.algorithm(algorithm)
.algorithm_config(algo_cfg)
.num_workers(num_workers)
.task_queue_capacity(task_queue_capacity)
.build(
Arc::clone(&market_data),
Arc::clone(&derived_data),
pool_event_rx,
derived_rx,
)?
}
PoolEntry::Custom(custom) => {
let algo_cfg = AlgorithmConfig::new(
custom.min_hops,
custom.max_hops,
Duration::from_millis(custom.timeout_ms),
custom.max_routes,
)?;
let builder = WorkerPoolBuilder::new()
.name(custom.name)
.algorithm_config(algo_cfg)
.num_workers(custom.num_workers)
.task_queue_capacity(custom.task_queue_capacity);
let builder = (custom.configure)(builder);
builder.build(
Arc::clone(&market_data),
Arc::clone(&derived_data),
pool_event_rx,
derived_rx,
)?
}
};
solver_pool_handles.push(SolverPoolHandle::new(worker_pool.name(), task_handle));
worker_pools.push(worker_pool);
}
let encoder = match self.encoder {
Some(enc) => enc,
None => {
let registry = SwapEncoderRegistry::new(self.chain)
.add_default_encoders(None)
.map_err(|e| SolverBuildError::Encoder(e.to_string()))?;
Encoder::new(self.chain, registry)
.map_err(|e| SolverBuildError::Encoder(e.to_string()))?
}
};
let chain = self.chain;
let router_address = encoder.router_address().clone();
let router_config = WorkerPoolRouterConfig::default()
.with_timeout(self.router_timeout)
.with_min_responses(self.router_min_responses);
let mut router = WorkerPoolRouter::new(solver_pool_handles, router_config, encoder);
if self.price_guard_enabled {
let mut registry = PriceProviderRegistry::new();
let mut worker_handles = Vec::new();
for mut provider in self.price_providers {
worker_handles.push(provider.start(Arc::clone(&market_data)));
registry = registry.register(provider);
}
let price_guard = PriceGuard::new(registry, worker_handles);
router = router.with_price_guard(price_guard);
}
let feed_handle = tokio::spawn(async move {
if let Err(e) = tycho_feed.run().await {
tracing::error!(error = %e, "tycho feed error");
}
});
let gas_price_handle = tokio::spawn(async move {
if let Err(e) = gas_price_fetcher.run().await {
tracing::error!(error = %e, "gas price fetcher error");
}
});
let computation_handle = tokio::spawn(async move {
computation_manager
.run(computation_event_rx, computation_shutdown_rx)
.await;
});
Ok(Solver {
router,
worker_pools,
market_data,
derived_data,
feed_handle,
gas_price_handle,
computation_handle,
computation_shutdown_tx,
chain,
router_address,
})
}
}
pub struct Solver {
router: WorkerPoolRouter,
worker_pools: Vec<WorkerPool>,
market_data: SharedMarketDataRef,
derived_data: SharedDerivedDataRef,
feed_handle: JoinHandle<()>,
gas_price_handle: JoinHandle<()>,
computation_handle: JoinHandle<()>,
computation_shutdown_tx: broadcast::Sender<()>,
chain: Chain,
router_address: Bytes,
}
impl Solver {
pub fn market_data(&self) -> SharedMarketDataRef {
Arc::clone(&self.market_data)
}
pub fn derived_data(&self) -> SharedDerivedDataRef {
Arc::clone(&self.derived_data)
}
pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
self.router.quote(request).await
}
pub async fn wait_until_ready(&self, timeout: Duration) -> Result<(), WaitReadyError> {
const POLL_INTERVAL: Duration = Duration::from_millis(500);
let deadline = tokio::time::Instant::now() + timeout;
loop {
let market_ready = self
.market_data
.read()
.await
.last_updated()
.is_some();
let derived_ready = self
.derived_data
.read()
.await
.derived_data_ready();
if market_ready && derived_ready {
return Ok(());
}
if tokio::time::Instant::now() >= deadline {
return Err(WaitReadyError { timeout_ms: timeout.as_millis() as u64 });
}
tokio::time::sleep(POLL_INTERVAL).await;
}
}
pub fn shutdown(self) {
let _ = self.computation_shutdown_tx.send(());
for pool in self.worker_pools {
pool.shutdown();
}
self.feed_handle.abort();
self.gas_price_handle.abort();
}
pub fn into_parts(self) -> SolverParts {
SolverParts {
router: self.router,
worker_pools: self.worker_pools,
market_data: self.market_data,
derived_data: self.derived_data,
feed_handle: self.feed_handle,
gas_price_handle: self.gas_price_handle,
computation_handle: self.computation_handle,
computation_shutdown_tx: self.computation_shutdown_tx,
chain: self.chain,
router_address: self.router_address,
}
}
}
pub struct SolverParts {
router: WorkerPoolRouter,
worker_pools: Vec<WorkerPool>,
market_data: SharedMarketDataRef,
derived_data: SharedDerivedDataRef,
feed_handle: JoinHandle<()>,
gas_price_handle: JoinHandle<()>,
computation_handle: JoinHandle<()>,
computation_shutdown_tx: broadcast::Sender<()>,
chain: Chain,
router_address: Bytes,
}
impl SolverParts {
pub fn chain(&self) -> Chain {
self.chain
}
pub fn router_address(&self) -> &Bytes {
&self.router_address
}
pub fn worker_pools(&self) -> &[WorkerPool] {
&self.worker_pools
}
pub fn market_data(&self) -> &SharedMarketDataRef {
&self.market_data
}
pub fn derived_data(&self) -> &SharedDerivedDataRef {
&self.derived_data
}
pub fn into_router(self) -> WorkerPoolRouter {
self.router
}
#[allow(clippy::type_complexity)]
pub fn into_components(
self,
) -> (
WorkerPoolRouter,
Vec<WorkerPool>,
SharedMarketDataRef,
SharedDerivedDataRef,
JoinHandle<()>,
JoinHandle<()>,
JoinHandle<()>,
broadcast::Sender<()>,
) {
(
self.router,
self.worker_pools,
self.market_data,
self.derived_data,
self.feed_handle,
self.gas_price_handle,
self.computation_handle,
self.computation_shutdown_tx,
)
}
}