1use std::{collections::HashSet, sync::Arc, time::Duration};
14
15use num_cpus;
16use serde::{Deserialize, Serialize};
17use tokio::{sync::broadcast, task::JoinHandle};
18use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
19use tycho_simulation::{
20 tycho_common::{models::Chain, Bytes},
21 tycho_ethereum::rpc::EthereumRpcClient,
22};
23
24use crate::{
25 algorithm::{AlgorithmConfig, AlgorithmError},
26 derived::{ComputationManager, ComputationManagerConfig, SharedDerivedDataRef},
27 encoding::encoder::Encoder,
28 feed::{
29 events::MarketEventHandler,
30 gas::GasPriceFetcher,
31 market_data::{SharedMarketData, SharedMarketDataRef},
32 tycho_feed::TychoFeed,
33 TychoFeedConfig,
34 },
35 graph::EdgeWeightUpdaterWithDerived,
36 price_guard::{
37 guard::PriceGuard, provider::PriceProvider, provider_registry::PriceProviderRegistry,
38 },
39 types::constants::native_token,
40 worker_pool::{
41 pool::{WorkerPool, WorkerPoolBuilder},
42 registry::UnknownAlgorithmError,
43 },
44 worker_pool_router::{config::WorkerPoolRouterConfig, SolverPoolHandle, WorkerPoolRouter},
45 Algorithm, Quote, QuoteRequest, SolveError,
46};
47
48pub mod defaults {
54 use std::time::Duration;
55
56 pub const MIN_TOKEN_QUALITY: i32 = 100;
58 pub const TRADED_N_DAYS_AGO: u64 = 3;
60 pub const TVL_BUFFER_RATIO: f64 = 1.1;
62 pub const GAS_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
64 pub const RECONNECT_DELAY: Duration = Duration::from_secs(5);
66 pub const ROUTER_MIN_RESPONSES: usize = 0;
69 pub const POOL_TASK_QUEUE_CAPACITY: usize = 1000;
71 pub const POOL_MIN_HOPS: usize = 1;
73 pub const POOL_MAX_HOPS: usize = 3;
75 pub const POOL_TIMEOUT_MS: u64 = 100;
77}
78
79const DEFAULT_TYCHO_USE_TLS: bool = true;
81const DEFAULT_DEPTH_SLIPPAGE_THRESHOLD: f64 = 0.01;
82const DEFAULT_ROUTER_TIMEOUT: Duration = Duration::from_secs(10);
85
86fn default_task_queue_capacity() -> usize {
89 defaults::POOL_TASK_QUEUE_CAPACITY
90}
91
92fn default_min_hops() -> usize {
93 defaults::POOL_MIN_HOPS
94}
95
96fn default_max_hops() -> usize {
97 defaults::POOL_MAX_HOPS
98}
99
100fn default_algo_timeout_ms() -> u64 {
101 defaults::POOL_TIMEOUT_MS
102}
103
104#[must_use]
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct PoolConfig {
108 algorithm: String,
110 #[serde(default = "num_cpus::get")]
112 num_workers: usize,
113 #[serde(default = "default_task_queue_capacity")]
115 task_queue_capacity: usize,
116 #[serde(default = "default_min_hops")]
118 min_hops: usize,
119 #[serde(default = "default_max_hops")]
121 max_hops: usize,
122 #[serde(default = "default_algo_timeout_ms")]
124 timeout_ms: u64,
125 #[serde(default)]
127 max_routes: Option<usize>,
128}
129
130impl PoolConfig {
131 pub fn new(algorithm: impl Into<String>) -> Self {
133 Self {
134 algorithm: algorithm.into(),
135 num_workers: num_cpus::get(),
136 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
137 min_hops: defaults::POOL_MIN_HOPS,
138 max_hops: defaults::POOL_MAX_HOPS,
139 timeout_ms: defaults::POOL_TIMEOUT_MS,
140 max_routes: None,
141 }
142 }
143
144 pub fn algorithm(&self) -> &str {
146 &self.algorithm
147 }
148
149 pub fn num_workers(&self) -> usize {
151 self.num_workers
152 }
153
154 pub fn with_num_workers(mut self, num_workers: usize) -> Self {
156 self.num_workers = num_workers;
157 self
158 }
159
160 pub fn with_task_queue_capacity(mut self, task_queue_capacity: usize) -> Self {
162 self.task_queue_capacity = task_queue_capacity;
163 self
164 }
165
166 pub fn with_min_hops(mut self, min_hops: usize) -> Self {
168 self.min_hops = min_hops;
169 self
170 }
171
172 pub fn with_max_hops(mut self, max_hops: usize) -> Self {
174 self.max_hops = max_hops;
175 self
176 }
177
178 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
180 self.timeout_ms = timeout_ms;
181 self
182 }
183
184 pub fn with_max_routes(mut self, max_routes: Option<usize>) -> Self {
186 self.max_routes = max_routes;
187 self
188 }
189
190 pub fn task_queue_capacity(&self) -> usize {
192 self.task_queue_capacity
193 }
194
195 pub fn min_hops(&self) -> usize {
197 self.min_hops
198 }
199
200 pub fn max_hops(&self) -> usize {
202 self.max_hops
203 }
204
205 pub fn timeout_ms(&self) -> u64 {
207 self.timeout_ms
208 }
209
210 pub fn max_routes(&self) -> Option<usize> {
212 self.max_routes
213 }
214}
215
216#[derive(Debug, thiserror::Error)]
218#[error("timed out after {timeout_ms}ms waiting for market data and derived computations")]
219pub struct WaitReadyError {
220 timeout_ms: u64,
221}
222
223#[non_exhaustive]
225#[derive(Debug, thiserror::Error)]
226pub enum SolverBuildError {
227 #[error("failed to create ethereum RPC client: {0}")]
229 RpcClient(String),
230 #[error(transparent)]
232 AlgorithmConfig(#[from] AlgorithmError),
233 #[error("failed to create computation manager: {0}")]
235 ComputationManager(String),
236 #[error("failed to create encoder: {0}")]
238 Encoder(String),
239 #[error(transparent)]
241 UnknownAlgorithm(#[from] UnknownAlgorithmError),
242 #[error("gas token not configured for chain")]
244 GasToken,
245 #[error("no worker pools configured")]
247 NoPools,
248}
249
250enum PoolEntry {
252 BuiltIn {
253 name: String,
254 algorithm: String,
255 num_workers: usize,
256 task_queue_capacity: usize,
257 min_hops: usize,
258 max_hops: usize,
259 timeout_ms: u64,
260 max_routes: Option<usize>,
261 },
262 Custom(CustomPoolEntry),
263}
264
265struct CustomPoolEntry {
267 name: String,
268 num_workers: usize,
269 task_queue_capacity: usize,
270 min_hops: usize,
271 max_hops: usize,
272 timeout_ms: u64,
273 max_routes: Option<usize>,
274 configure: Box<dyn FnOnce(WorkerPoolBuilder) -> WorkerPoolBuilder + Send>,
276}
277
278#[must_use = "a builder does nothing until .build() is called"]
280pub struct FyndBuilder {
283 chain: Chain,
284 tycho_url: String,
285 rpc_url: String,
286 protocols: Vec<String>,
287 min_tvl: f64,
288 tycho_api_key: Option<String>,
289 tycho_use_tls: bool,
290 min_token_quality: i32,
291 traded_n_days_ago: u64,
292 tvl_buffer_ratio: f64,
293 gas_refresh_interval: Duration,
294 reconnect_delay: Duration,
295 blocklisted_components: HashSet<String>,
296 router_timeout: Duration,
297 router_min_responses: usize,
298 encoder: Option<Encoder>,
299 pools: Vec<PoolEntry>,
300 price_guard_enabled: bool,
301 price_providers: Vec<Box<dyn PriceProvider>>,
302}
303
304impl FyndBuilder {
305 pub fn new(
307 chain: Chain,
308 tycho_url: impl Into<String>,
309 rpc_url: impl Into<String>,
310 protocols: Vec<String>,
311 min_tvl: f64,
312 ) -> Self {
313 Self {
314 chain,
315 tycho_url: tycho_url.into(),
316 rpc_url: rpc_url.into(),
317 protocols,
318 min_tvl,
319 tycho_api_key: None,
320 tycho_use_tls: DEFAULT_TYCHO_USE_TLS,
321 min_token_quality: defaults::MIN_TOKEN_QUALITY,
322 traded_n_days_ago: defaults::TRADED_N_DAYS_AGO,
323 tvl_buffer_ratio: defaults::TVL_BUFFER_RATIO,
324 gas_refresh_interval: defaults::GAS_REFRESH_INTERVAL,
325 reconnect_delay: defaults::RECONNECT_DELAY,
326 blocklisted_components: HashSet::new(),
327 router_timeout: DEFAULT_ROUTER_TIMEOUT,
328 router_min_responses: defaults::ROUTER_MIN_RESPONSES,
329 encoder: None,
330 pools: Vec::new(),
331 price_guard_enabled: false,
332 price_providers: Vec::new(),
333 }
334 }
335
336 pub fn chain(&self) -> Chain {
338 self.chain
339 }
340
341 pub fn tycho_api_key(mut self, key: impl Into<String>) -> Self {
343 self.tycho_api_key = Some(key.into());
344 self
345 }
346
347 pub fn min_tvl(mut self, min_tvl: f64) -> Self {
349 self.min_tvl = min_tvl;
350 self
351 }
352
353 pub fn tycho_use_tls(mut self, use_tls: bool) -> Self {
355 self.tycho_use_tls = use_tls;
356 self
357 }
358
359 pub fn min_token_quality(mut self, quality: i32) -> Self {
362 self.min_token_quality = quality;
363 self
364 }
365
366 pub fn traded_n_days_ago(mut self, days: u64) -> Self {
368 self.traded_n_days_ago = days;
369 self
370 }
371
372 pub fn tvl_buffer_ratio(mut self, ratio: f64) -> Self {
374 self.tvl_buffer_ratio = ratio;
375 self
376 }
377
378 pub fn gas_refresh_interval(mut self, interval: Duration) -> Self {
380 self.gas_refresh_interval = interval;
381 self
382 }
383
384 pub fn reconnect_delay(mut self, delay: Duration) -> Self {
386 self.reconnect_delay = delay;
387 self
388 }
389
390 pub fn blocklisted_components(mut self, components: HashSet<String>) -> Self {
392 self.blocklisted_components = components;
393 self
394 }
395
396 pub fn worker_router_timeout(mut self, timeout: Duration) -> Self {
398 self.router_timeout = timeout;
399 self
400 }
401
402 pub fn worker_router_min_responses(mut self, min: usize) -> Self {
404 self.router_min_responses = min;
405 self
406 }
407
408 pub fn encoder(mut self, encoder: Encoder) -> Self {
410 self.encoder = Some(encoder);
411 self
412 }
413
414 pub fn algorithm(mut self, algorithm: impl Into<String>) -> Self {
416 self.pools.push(PoolEntry::BuiltIn {
417 name: "default".to_string(),
418 algorithm: algorithm.into(),
419 num_workers: num_cpus::get(),
420 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
421 min_hops: defaults::POOL_MIN_HOPS,
422 max_hops: defaults::POOL_MAX_HOPS,
423 timeout_ms: defaults::POOL_TIMEOUT_MS,
424 max_routes: None,
425 });
426 self
427 }
428
429 pub fn with_algorithm<A, F>(mut self, name: impl Into<String>, factory: F) -> Self
433 where
434 A: Algorithm + 'static,
435 A::GraphManager: MarketEventHandler + EdgeWeightUpdaterWithDerived + 'static,
436 F: Fn(AlgorithmConfig) -> A + Clone + Send + Sync + 'static,
437 {
438 let name = name.into();
439 let algo_name = name.clone();
440 let configure =
441 Box::new(move |builder: WorkerPoolBuilder| builder.with_algorithm(algo_name, factory));
442 self.pools
443 .push(PoolEntry::Custom(CustomPoolEntry {
444 name,
445 num_workers: num_cpus::get(),
446 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
447 min_hops: defaults::POOL_MIN_HOPS,
448 max_hops: defaults::POOL_MAX_HOPS,
449 timeout_ms: defaults::POOL_TIMEOUT_MS,
450 max_routes: None,
451 configure,
452 }));
453 self
454 }
455
456 pub fn add_default_price_providers(self) -> Self {
463 self.register_price_provider(Box::new(
464 crate::price_guard::hyperliquid::HyperliquidProvider::default(),
465 ))
466 .register_price_provider(Box::new(
467 crate::price_guard::binance_ws::BinanceWsProvider::default(),
468 ))
469 }
470
471 pub fn register_price_provider(mut self, provider: Box<dyn PriceProvider>) -> Self {
476 self.price_providers.push(provider);
477 self
478 }
479
480 pub fn price_guard_enabled(mut self, enabled: bool) -> Self {
487 self.price_guard_enabled = enabled;
488 self
489 }
490
491 pub fn add_pool(mut self, name: impl Into<String>, config: &PoolConfig) -> Self {
493 self.pools.push(PoolEntry::BuiltIn {
494 name: name.into(),
495 algorithm: config.algorithm().to_string(),
496 num_workers: config.num_workers(),
497 task_queue_capacity: config.task_queue_capacity(),
498 min_hops: config.min_hops(),
499 max_hops: config.max_hops(),
500 timeout_ms: config.timeout_ms(),
501 max_routes: config.max_routes(),
502 });
503 self
504 }
505
506 pub fn build(mut self) -> Result<Solver, SolverBuildError> {
512 if self.pools.is_empty() {
513 return Err(SolverBuildError::NoPools);
514 }
515
516 if self.price_providers.is_empty() {
518 self = self.add_default_price_providers();
519 }
520
521 let market_data = Arc::new(tokio::sync::RwLock::new(SharedMarketData::new()));
522
523 let tycho_feed_config = TychoFeedConfig::new(
524 self.tycho_url,
525 self.chain,
526 self.tycho_api_key,
527 self.tycho_use_tls,
528 self.protocols,
529 self.min_tvl,
530 )
531 .tvl_buffer_ratio(self.tvl_buffer_ratio)
532 .gas_refresh_interval(self.gas_refresh_interval)
533 .reconnect_delay(self.reconnect_delay)
534 .min_token_quality(self.min_token_quality)
535 .traded_n_days_ago(self.traded_n_days_ago)
536 .blocklisted_components(self.blocklisted_components);
537
538 let ethereum_client = EthereumRpcClient::new(self.rpc_url.as_str())
539 .map_err(|e| SolverBuildError::RpcClient(e.to_string()))?;
540
541 let (mut gas_price_fetcher, gas_price_worker_signal_tx) =
542 GasPriceFetcher::new(ethereum_client, Arc::clone(&market_data));
543
544 let mut tycho_feed = TychoFeed::new(tycho_feed_config, Arc::clone(&market_data));
545 tycho_feed = tycho_feed.with_gas_price_worker_signal_tx(gas_price_worker_signal_tx);
546
547 let gas_token = native_token(&self.chain).map_err(|_| SolverBuildError::GasToken)?;
548 let computation_config = ComputationManagerConfig::new()
549 .with_gas_token(gas_token)
550 .with_depth_slippage_threshold(DEFAULT_DEPTH_SLIPPAGE_THRESHOLD);
551 let (computation_manager, _) =
554 ComputationManager::new(computation_config, Arc::clone(&market_data))
555 .map_err(|e| SolverBuildError::ComputationManager(e.to_string()))?;
556
557 let derived_data: SharedDerivedDataRef = computation_manager.store();
558 let derived_event_tx = computation_manager.event_sender();
559
560 let computation_event_rx = tycho_feed.subscribe();
562 let (computation_shutdown_tx, computation_shutdown_rx) = broadcast::channel(1);
563
564 let mut solver_pool_handles: Vec<SolverPoolHandle> = Vec::new();
565 let mut worker_pools: Vec<WorkerPool> = Vec::new();
566
567 for pool_entry in self.pools {
568 let pool_event_rx = tycho_feed.subscribe();
569 let derived_rx = derived_event_tx.subscribe();
570
571 let (worker_pool, task_handle) = match pool_entry {
572 PoolEntry::BuiltIn {
573 name,
574 algorithm,
575 num_workers,
576 task_queue_capacity,
577 min_hops,
578 max_hops,
579 timeout_ms,
580 max_routes,
581 } => {
582 let algo_cfg = AlgorithmConfig::new(
583 min_hops,
584 max_hops,
585 Duration::from_millis(timeout_ms),
586 max_routes,
587 )?;
588 WorkerPoolBuilder::new()
589 .name(name)
590 .algorithm(algorithm)
591 .algorithm_config(algo_cfg)
592 .num_workers(num_workers)
593 .task_queue_capacity(task_queue_capacity)
594 .build(
595 Arc::clone(&market_data),
596 Arc::clone(&derived_data),
597 pool_event_rx,
598 derived_rx,
599 )?
600 }
601 PoolEntry::Custom(custom) => {
602 let algo_cfg = AlgorithmConfig::new(
603 custom.min_hops,
604 custom.max_hops,
605 Duration::from_millis(custom.timeout_ms),
606 custom.max_routes,
607 )?;
608 let builder = WorkerPoolBuilder::new()
609 .name(custom.name)
610 .algorithm_config(algo_cfg)
611 .num_workers(custom.num_workers)
612 .task_queue_capacity(custom.task_queue_capacity);
613 let builder = (custom.configure)(builder);
614 builder.build(
615 Arc::clone(&market_data),
616 Arc::clone(&derived_data),
617 pool_event_rx,
618 derived_rx,
619 )?
620 }
621 };
622
623 solver_pool_handles.push(SolverPoolHandle::new(worker_pool.name(), task_handle));
624 worker_pools.push(worker_pool);
625 }
626
627 let encoder = match self.encoder {
628 Some(enc) => enc,
629 None => {
630 let registry = SwapEncoderRegistry::new(self.chain)
631 .add_default_encoders(None)
632 .map_err(|e| SolverBuildError::Encoder(e.to_string()))?;
633 Encoder::new(self.chain, registry)
634 .map_err(|e| SolverBuildError::Encoder(e.to_string()))?
635 }
636 };
637
638 let chain = self.chain;
639 let router_address = encoder.router_address().clone();
640
641 let router_config = WorkerPoolRouterConfig::default()
644 .with_timeout(self.router_timeout)
645 .with_min_responses(self.router_min_responses);
646 let mut router = WorkerPoolRouter::new(solver_pool_handles, router_config, encoder);
647
648 if self.price_guard_enabled {
649 let mut registry = PriceProviderRegistry::new();
650 let mut worker_handles = Vec::new();
651 for mut provider in self.price_providers {
652 worker_handles.push(provider.start(Arc::clone(&market_data)));
653 registry = registry.register(provider);
654 }
655 let price_guard = PriceGuard::new(registry, worker_handles);
656 router = router.with_price_guard(price_guard);
657 }
658
659 let feed_handle = tokio::spawn(async move {
660 if let Err(e) = tycho_feed.run().await {
661 tracing::error!(error = %e, "tycho feed error");
662 }
663 });
664
665 let gas_price_handle = tokio::spawn(async move {
666 if let Err(e) = gas_price_fetcher.run().await {
667 tracing::error!(error = %e, "gas price fetcher error");
668 }
669 });
670
671 let computation_handle = tokio::spawn(async move {
672 computation_manager
673 .run(computation_event_rx, computation_shutdown_rx)
674 .await;
675 });
676
677 Ok(Solver {
678 router,
679 worker_pools,
680 market_data,
681 derived_data,
682 feed_handle,
683 gas_price_handle,
684 computation_handle,
685 computation_shutdown_tx,
686 chain,
687 router_address,
688 })
689 }
690}
691
692pub struct Solver {
694 router: WorkerPoolRouter,
695 worker_pools: Vec<WorkerPool>,
696 market_data: SharedMarketDataRef,
697 derived_data: SharedDerivedDataRef,
698 feed_handle: JoinHandle<()>,
699 gas_price_handle: JoinHandle<()>,
700 computation_handle: JoinHandle<()>,
701 computation_shutdown_tx: broadcast::Sender<()>,
702 chain: Chain,
703 router_address: Bytes,
704}
705
706impl Solver {
707 pub fn market_data(&self) -> SharedMarketDataRef {
709 Arc::clone(&self.market_data)
710 }
711
712 pub fn derived_data(&self) -> SharedDerivedDataRef {
714 Arc::clone(&self.derived_data)
715 }
716
717 pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
723 self.router.quote(request).await
724 }
725
726 pub async fn wait_until_ready(&self, timeout: Duration) -> Result<(), WaitReadyError> {
742 const POLL_INTERVAL: Duration = Duration::from_millis(500);
743
744 let deadline = tokio::time::Instant::now() + timeout;
745
746 loop {
747 let market_ready = self
748 .market_data
749 .read()
750 .await
751 .last_updated()
752 .is_some();
753 let derived_ready = self
754 .derived_data
755 .read()
756 .await
757 .derived_data_ready();
758
759 if market_ready && derived_ready {
760 return Ok(());
761 }
762
763 if tokio::time::Instant::now() >= deadline {
764 return Err(WaitReadyError { timeout_ms: timeout.as_millis() as u64 });
765 }
766
767 tokio::time::sleep(POLL_INTERVAL).await;
768 }
769 }
770
771 pub fn shutdown(self) {
773 let _ = self.computation_shutdown_tx.send(());
774 for pool in self.worker_pools {
775 pool.shutdown();
776 }
777 self.feed_handle.abort();
778 self.gas_price_handle.abort();
779 }
780
781 pub fn into_parts(self) -> SolverParts {
783 SolverParts {
784 router: self.router,
785 worker_pools: self.worker_pools,
786 market_data: self.market_data,
787 derived_data: self.derived_data,
788 feed_handle: self.feed_handle,
789 gas_price_handle: self.gas_price_handle,
790 computation_handle: self.computation_handle,
791 computation_shutdown_tx: self.computation_shutdown_tx,
792 chain: self.chain,
793 router_address: self.router_address,
794 }
795 }
796}
797
798pub struct SolverParts {
802 router: WorkerPoolRouter,
804 worker_pools: Vec<WorkerPool>,
806 market_data: SharedMarketDataRef,
808 derived_data: SharedDerivedDataRef,
810 feed_handle: JoinHandle<()>,
812 gas_price_handle: JoinHandle<()>,
814 computation_handle: JoinHandle<()>,
816 computation_shutdown_tx: broadcast::Sender<()>,
818 chain: Chain,
820 router_address: Bytes,
822}
823
824impl SolverParts {
825 pub fn chain(&self) -> Chain {
827 self.chain
828 }
829
830 pub fn router_address(&self) -> &Bytes {
832 &self.router_address
833 }
834
835 pub fn worker_pools(&self) -> &[WorkerPool] {
837 &self.worker_pools
838 }
839
840 pub fn market_data(&self) -> &SharedMarketDataRef {
842 &self.market_data
843 }
844
845 pub fn derived_data(&self) -> &SharedDerivedDataRef {
847 &self.derived_data
848 }
849
850 pub fn into_router(self) -> WorkerPoolRouter {
852 self.router
853 }
854
855 #[allow(clippy::type_complexity)]
857 pub fn into_components(
858 self,
859 ) -> (
860 WorkerPoolRouter,
861 Vec<WorkerPool>,
862 SharedMarketDataRef,
863 SharedDerivedDataRef,
864 JoinHandle<()>,
865 JoinHandle<()>,
866 JoinHandle<()>,
867 broadcast::Sender<()>,
868 ) {
869 (
870 self.router,
871 self.worker_pools,
872 self.market_data,
873 self.derived_data,
874 self.feed_handle,
875 self.gas_price_handle,
876 self.computation_handle,
877 self.computation_shutdown_tx,
878 )
879 }
880}