1use std::{collections::HashSet, sync::Arc, time::Duration};
14
15use num_cpus;
16use serde::{Deserialize, Serialize};
17use tokio::{sync::broadcast, task::JoinHandle};
18use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
19use tycho_simulation::{
20 tycho_common::{models::Chain, Bytes},
21 tycho_ethereum::rpc::EthereumRpcClient,
22};
23
24use crate::{
25 algorithm::{AlgorithmConfig, AlgorithmError},
26 derived::{ComputationManager, ComputationManagerConfig, SharedDerivedDataRef},
27 encoding::encoder::Encoder,
28 feed::{
29 events::MarketEventHandler,
30 gas::GasPriceFetcher,
31 market_data::{SharedMarketData, SharedMarketDataRef},
32 tycho_feed::TychoFeed,
33 TychoFeedConfig,
34 },
35 graph::EdgeWeightUpdaterWithDerived,
36 types::constants::native_token,
37 worker_pool::{
38 pool::{WorkerPool, WorkerPoolBuilder},
39 registry::UnknownAlgorithmError,
40 },
41 worker_pool_router::{config::WorkerPoolRouterConfig, SolverPoolHandle, WorkerPoolRouter},
42 Algorithm, Quote, QuoteRequest, SolveError,
43};
44
45pub mod defaults {
51 use std::time::Duration;
52
53 pub const MIN_TOKEN_QUALITY: i32 = 100;
54 pub const TRADED_N_DAYS_AGO: u64 = 3;
55 pub const TVL_BUFFER_RATIO: f64 = 1.1;
56 pub const GAS_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
57 pub const RECONNECT_DELAY: Duration = Duration::from_secs(5);
58 pub const ROUTER_MIN_RESPONSES: usize = 0;
59 pub const POOL_TASK_QUEUE_CAPACITY: usize = 1000;
60 pub const POOL_MIN_HOPS: usize = 1;
61 pub const POOL_MAX_HOPS: usize = 3;
62 pub const POOL_TIMEOUT_MS: u64 = 100;
63}
64
65const DEFAULT_TYCHO_USE_TLS: bool = true;
67const DEFAULT_DEPTH_SLIPPAGE_THRESHOLD: f64 = 0.01;
68const DEFAULT_ROUTER_TIMEOUT: Duration = Duration::from_secs(10);
71
72fn default_task_queue_capacity() -> usize {
75 defaults::POOL_TASK_QUEUE_CAPACITY
76}
77
78fn default_min_hops() -> usize {
79 defaults::POOL_MIN_HOPS
80}
81
82fn default_max_hops() -> usize {
83 defaults::POOL_MAX_HOPS
84}
85
86fn default_algo_timeout_ms() -> u64 {
87 defaults::POOL_TIMEOUT_MS
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct PoolConfig {
93 algorithm: String,
95 #[serde(default = "num_cpus::get")]
97 num_workers: usize,
98 #[serde(default = "default_task_queue_capacity")]
100 task_queue_capacity: usize,
101 #[serde(default = "default_min_hops")]
103 min_hops: usize,
104 #[serde(default = "default_max_hops")]
106 max_hops: usize,
107 #[serde(default = "default_algo_timeout_ms")]
109 timeout_ms: u64,
110 #[serde(default)]
112 max_routes: Option<usize>,
113}
114
115impl PoolConfig {
116 pub fn new(algorithm: impl Into<String>) -> Self {
118 Self {
119 algorithm: algorithm.into(),
120 num_workers: num_cpus::get(),
121 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
122 min_hops: defaults::POOL_MIN_HOPS,
123 max_hops: defaults::POOL_MAX_HOPS,
124 timeout_ms: defaults::POOL_TIMEOUT_MS,
125 max_routes: None,
126 }
127 }
128
129 pub fn algorithm(&self) -> &str {
131 &self.algorithm
132 }
133
134 pub fn num_workers(&self) -> usize {
136 self.num_workers
137 }
138
139 pub fn with_num_workers(mut self, num_workers: usize) -> Self {
141 self.num_workers = num_workers;
142 self
143 }
144
145 pub fn with_task_queue_capacity(mut self, task_queue_capacity: usize) -> Self {
147 self.task_queue_capacity = task_queue_capacity;
148 self
149 }
150
151 pub fn with_min_hops(mut self, min_hops: usize) -> Self {
153 self.min_hops = min_hops;
154 self
155 }
156
157 pub fn with_max_hops(mut self, max_hops: usize) -> Self {
159 self.max_hops = max_hops;
160 self
161 }
162
163 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
165 self.timeout_ms = timeout_ms;
166 self
167 }
168
169 pub fn with_max_routes(mut self, max_routes: Option<usize>) -> Self {
171 self.max_routes = max_routes;
172 self
173 }
174
175 pub fn task_queue_capacity(&self) -> usize {
177 self.task_queue_capacity
178 }
179
180 pub fn min_hops(&self) -> usize {
182 self.min_hops
183 }
184
185 pub fn max_hops(&self) -> usize {
187 self.max_hops
188 }
189
190 pub fn timeout_ms(&self) -> u64 {
192 self.timeout_ms
193 }
194
195 pub fn max_routes(&self) -> Option<usize> {
197 self.max_routes
198 }
199}
200
201#[derive(Debug, thiserror::Error)]
203#[error("timed out after {timeout_ms}ms waiting for market data and derived computations")]
204pub struct WaitReadyError {
205 timeout_ms: u64,
206}
207
208#[non_exhaustive]
210#[derive(Debug, thiserror::Error)]
211pub enum SolverBuildError {
212 #[error("failed to create ethereum RPC client: {0}")]
214 RpcClient(String),
215 #[error(transparent)]
217 AlgorithmConfig(#[from] AlgorithmError),
218 #[error("failed to create computation manager: {0}")]
220 ComputationManager(String),
221 #[error("failed to create encoder: {0}")]
223 Encoder(String),
224 #[error(transparent)]
226 UnknownAlgorithm(#[from] UnknownAlgorithmError),
227 #[error("gas token not configured for chain")]
229 GasToken,
230 #[error("no worker pools configured")]
232 NoPools,
233}
234
235enum PoolEntry {
237 BuiltIn {
238 name: String,
239 algorithm: String,
240 num_workers: usize,
241 task_queue_capacity: usize,
242 min_hops: usize,
243 max_hops: usize,
244 timeout_ms: u64,
245 max_routes: Option<usize>,
246 },
247 Custom(CustomPoolEntry),
248}
249
250struct CustomPoolEntry {
252 name: String,
253 num_workers: usize,
254 task_queue_capacity: usize,
255 min_hops: usize,
256 max_hops: usize,
257 timeout_ms: u64,
258 max_routes: Option<usize>,
259 configure: Box<dyn FnOnce(WorkerPoolBuilder) -> WorkerPoolBuilder + Send>,
261}
262
263#[must_use = "a builder does nothing until .build() is called"]
265pub struct FyndBuilder {
268 chain: Chain,
269 tycho_url: String,
270 rpc_url: String,
271 protocols: Vec<String>,
272 min_tvl: f64,
273 tycho_api_key: Option<String>,
274 tycho_use_tls: bool,
275 min_token_quality: i32,
276 traded_n_days_ago: u64,
277 tvl_buffer_ratio: f64,
278 gas_refresh_interval: Duration,
279 reconnect_delay: Duration,
280 blacklisted_components: HashSet<String>,
281 router_timeout: Duration,
282 router_min_responses: usize,
283 encoder: Option<Encoder>,
284 pools: Vec<PoolEntry>,
285}
286
287impl FyndBuilder {
288 pub fn new(
290 chain: Chain,
291 tycho_url: impl Into<String>,
292 rpc_url: impl Into<String>,
293 protocols: Vec<String>,
294 min_tvl: f64,
295 ) -> Self {
296 Self {
297 chain,
298 tycho_url: tycho_url.into(),
299 rpc_url: rpc_url.into(),
300 protocols,
301 min_tvl,
302 tycho_api_key: None,
303 tycho_use_tls: DEFAULT_TYCHO_USE_TLS,
304 min_token_quality: defaults::MIN_TOKEN_QUALITY,
305 traded_n_days_ago: defaults::TRADED_N_DAYS_AGO,
306 tvl_buffer_ratio: defaults::TVL_BUFFER_RATIO,
307 gas_refresh_interval: defaults::GAS_REFRESH_INTERVAL,
308 reconnect_delay: defaults::RECONNECT_DELAY,
309 blacklisted_components: HashSet::new(),
310 router_timeout: DEFAULT_ROUTER_TIMEOUT,
311 router_min_responses: defaults::ROUTER_MIN_RESPONSES,
312 encoder: None,
313 pools: Vec::new(),
314 }
315 }
316
317 pub fn chain(&self) -> Chain {
318 self.chain
319 }
320
321 pub fn tycho_api_key(mut self, key: impl Into<String>) -> Self {
323 self.tycho_api_key = Some(key.into());
324 self
325 }
326
327 pub fn min_tvl(mut self, min_tvl: f64) -> Self {
329 self.min_tvl = min_tvl;
330 self
331 }
332
333 pub fn tycho_use_tls(mut self, use_tls: bool) -> Self {
335 self.tycho_use_tls = use_tls;
336 self
337 }
338
339 pub fn min_token_quality(mut self, quality: i32) -> Self {
342 self.min_token_quality = quality;
343 self
344 }
345
346 pub fn traded_n_days_ago(mut self, days: u64) -> Self {
348 self.traded_n_days_ago = days;
349 self
350 }
351
352 pub fn tvl_buffer_ratio(mut self, ratio: f64) -> Self {
354 self.tvl_buffer_ratio = ratio;
355 self
356 }
357
358 pub fn gas_refresh_interval(mut self, interval: Duration) -> Self {
360 self.gas_refresh_interval = interval;
361 self
362 }
363
364 pub fn reconnect_delay(mut self, delay: Duration) -> Self {
366 self.reconnect_delay = delay;
367 self
368 }
369
370 pub fn blacklisted_components(mut self, components: HashSet<String>) -> Self {
372 self.blacklisted_components = components;
373 self
374 }
375
376 pub fn worker_router_timeout(mut self, timeout: Duration) -> Self {
378 self.router_timeout = timeout;
379 self
380 }
381
382 pub fn worker_router_min_responses(mut self, min: usize) -> Self {
384 self.router_min_responses = min;
385 self
386 }
387
388 pub fn encoder(mut self, encoder: Encoder) -> Self {
390 self.encoder = Some(encoder);
391 self
392 }
393
394 pub fn algorithm(mut self, algorithm: impl Into<String>) -> Self {
396 self.pools.push(PoolEntry::BuiltIn {
397 name: "default".to_string(),
398 algorithm: algorithm.into(),
399 num_workers: num_cpus::get(),
400 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
401 min_hops: defaults::POOL_MIN_HOPS,
402 max_hops: defaults::POOL_MAX_HOPS,
403 timeout_ms: defaults::POOL_TIMEOUT_MS,
404 max_routes: None,
405 });
406 self
407 }
408
409 pub fn with_algorithm<A, F>(mut self, name: impl Into<String>, factory: F) -> Self
413 where
414 A: Algorithm + 'static,
415 A::GraphManager: MarketEventHandler + EdgeWeightUpdaterWithDerived + 'static,
416 F: Fn(AlgorithmConfig) -> A + Clone + Send + Sync + 'static,
417 {
418 let name = name.into();
419 let algo_name = name.clone();
420 let configure =
421 Box::new(move |builder: WorkerPoolBuilder| builder.with_algorithm(algo_name, factory));
422 self.pools
423 .push(PoolEntry::Custom(CustomPoolEntry {
424 name,
425 num_workers: num_cpus::get(),
426 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
427 min_hops: defaults::POOL_MIN_HOPS,
428 max_hops: defaults::POOL_MAX_HOPS,
429 timeout_ms: defaults::POOL_TIMEOUT_MS,
430 max_routes: None,
431 configure,
432 }));
433 self
434 }
435
436 pub fn add_pool(mut self, name: impl Into<String>, config: &PoolConfig) -> Self {
438 self.pools.push(PoolEntry::BuiltIn {
439 name: name.into(),
440 algorithm: config.algorithm().to_string(),
441 num_workers: config.num_workers(),
442 task_queue_capacity: config.task_queue_capacity(),
443 min_hops: config.min_hops(),
444 max_hops: config.max_hops(),
445 timeout_ms: config.timeout_ms(),
446 max_routes: config.max_routes(),
447 });
448 self
449 }
450
451 pub fn build(self) -> Result<Solver, SolverBuildError> {
457 if self.pools.is_empty() {
458 return Err(SolverBuildError::NoPools);
459 }
460
461 let market_data = Arc::new(tokio::sync::RwLock::new(SharedMarketData::new()));
462
463 let tycho_feed_config = TychoFeedConfig::new(
464 self.tycho_url,
465 self.chain,
466 self.tycho_api_key,
467 self.tycho_use_tls,
468 self.protocols,
469 self.min_tvl,
470 )
471 .tvl_buffer_ratio(self.tvl_buffer_ratio)
472 .gas_refresh_interval(self.gas_refresh_interval)
473 .reconnect_delay(self.reconnect_delay)
474 .min_token_quality(self.min_token_quality)
475 .traded_n_days_ago(self.traded_n_days_ago)
476 .blacklisted_components(self.blacklisted_components);
477
478 let ethereum_client = EthereumRpcClient::new(self.rpc_url.as_str())
479 .map_err(|e| SolverBuildError::RpcClient(e.to_string()))?;
480
481 let (mut gas_price_fetcher, gas_price_worker_signal_tx) =
482 GasPriceFetcher::new(ethereum_client, Arc::clone(&market_data));
483
484 let mut tycho_feed = TychoFeed::new(tycho_feed_config, Arc::clone(&market_data));
485 tycho_feed = tycho_feed.with_gas_price_worker_signal_tx(gas_price_worker_signal_tx);
486
487 let gas_token = native_token(&self.chain).map_err(|_| SolverBuildError::GasToken)?;
488 let computation_config = ComputationManagerConfig::new()
489 .with_gas_token(gas_token)
490 .with_depth_slippage_threshold(DEFAULT_DEPTH_SLIPPAGE_THRESHOLD);
491 let (computation_manager, _) =
494 ComputationManager::new(computation_config, Arc::clone(&market_data))
495 .map_err(|e| SolverBuildError::ComputationManager(e.to_string()))?;
496
497 let derived_data: SharedDerivedDataRef = computation_manager.store();
498 let derived_event_tx = computation_manager.event_sender();
499
500 let computation_event_rx = tycho_feed.subscribe();
502 let (computation_shutdown_tx, computation_shutdown_rx) = broadcast::channel(1);
503
504 let mut solver_pool_handles: Vec<SolverPoolHandle> = Vec::new();
505 let mut worker_pools: Vec<WorkerPool> = Vec::new();
506
507 for pool_entry in self.pools {
508 let pool_event_rx = tycho_feed.subscribe();
509 let derived_rx = derived_event_tx.subscribe();
510
511 let (worker_pool, task_handle) = match pool_entry {
512 PoolEntry::BuiltIn {
513 name,
514 algorithm,
515 num_workers,
516 task_queue_capacity,
517 min_hops,
518 max_hops,
519 timeout_ms,
520 max_routes,
521 } => {
522 let algo_cfg = AlgorithmConfig::new(
523 min_hops,
524 max_hops,
525 Duration::from_millis(timeout_ms),
526 max_routes,
527 )?;
528 WorkerPoolBuilder::new()
529 .name(name)
530 .algorithm(algorithm)
531 .algorithm_config(algo_cfg)
532 .num_workers(num_workers)
533 .task_queue_capacity(task_queue_capacity)
534 .build(
535 Arc::clone(&market_data),
536 Arc::clone(&derived_data),
537 pool_event_rx,
538 derived_rx,
539 )?
540 }
541 PoolEntry::Custom(custom) => {
542 let algo_cfg = AlgorithmConfig::new(
543 custom.min_hops,
544 custom.max_hops,
545 Duration::from_millis(custom.timeout_ms),
546 custom.max_routes,
547 )?;
548 let builder = WorkerPoolBuilder::new()
549 .name(custom.name)
550 .algorithm_config(algo_cfg)
551 .num_workers(custom.num_workers)
552 .task_queue_capacity(custom.task_queue_capacity);
553 let builder = (custom.configure)(builder);
554 builder.build(
555 Arc::clone(&market_data),
556 Arc::clone(&derived_data),
557 pool_event_rx,
558 derived_rx,
559 )?
560 }
561 };
562
563 solver_pool_handles.push(SolverPoolHandle::new(worker_pool.name(), task_handle));
564 worker_pools.push(worker_pool);
565 }
566
567 let encoder = match self.encoder {
568 Some(enc) => enc,
569 None => {
570 let registry = SwapEncoderRegistry::new(self.chain)
571 .add_default_encoders(None)
572 .map_err(|e| SolverBuildError::Encoder(e.to_string()))?;
573 Encoder::new(self.chain, registry)
574 .map_err(|e| SolverBuildError::Encoder(e.to_string()))?
575 }
576 };
577
578 let chain = self.chain;
579 let router_address = encoder.router_address().clone();
580
581 let router_config = WorkerPoolRouterConfig::default()
582 .with_timeout(self.router_timeout)
583 .with_min_responses(self.router_min_responses);
584 let router = WorkerPoolRouter::new(solver_pool_handles, router_config, encoder);
585
586 let feed_handle = tokio::spawn(async move {
587 if let Err(e) = tycho_feed.run().await {
588 tracing::error!(error = %e, "tycho feed error");
589 }
590 });
591
592 let gas_price_handle = tokio::spawn(async move {
593 if let Err(e) = gas_price_fetcher.run().await {
594 tracing::error!(error = %e, "gas price fetcher error");
595 }
596 });
597
598 let computation_handle = tokio::spawn(async move {
599 computation_manager
600 .run(computation_event_rx, computation_shutdown_rx)
601 .await;
602 });
603
604 Ok(Solver {
605 router,
606 worker_pools,
607 market_data,
608 derived_data,
609 feed_handle,
610 gas_price_handle,
611 computation_handle,
612 computation_shutdown_tx,
613 chain,
614 router_address,
615 })
616 }
617}
618
619pub struct Solver {
621 router: WorkerPoolRouter,
622 worker_pools: Vec<WorkerPool>,
623 market_data: SharedMarketDataRef,
624 derived_data: SharedDerivedDataRef,
625 feed_handle: JoinHandle<()>,
626 gas_price_handle: JoinHandle<()>,
627 computation_handle: JoinHandle<()>,
628 computation_shutdown_tx: broadcast::Sender<()>,
629 chain: Chain,
630 router_address: Bytes,
631}
632
633impl Solver {
634 pub fn market_data(&self) -> SharedMarketDataRef {
636 Arc::clone(&self.market_data)
637 }
638
639 pub fn derived_data(&self) -> SharedDerivedDataRef {
641 Arc::clone(&self.derived_data)
642 }
643
644 pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
650 self.router.quote(request).await
651 }
652
653 pub async fn wait_until_ready(&self, timeout: Duration) -> Result<(), WaitReadyError> {
669 const POLL_INTERVAL: Duration = Duration::from_millis(500);
670
671 let deadline = tokio::time::Instant::now() + timeout;
672
673 loop {
674 let market_ready = self
675 .market_data
676 .read()
677 .await
678 .last_updated()
679 .is_some();
680 let derived_ready = self
681 .derived_data
682 .read()
683 .await
684 .last_block()
685 .is_some();
686
687 if market_ready && derived_ready {
688 return Ok(());
689 }
690
691 if tokio::time::Instant::now() >= deadline {
692 return Err(WaitReadyError { timeout_ms: timeout.as_millis() as u64 });
693 }
694
695 tokio::time::sleep(POLL_INTERVAL).await;
696 }
697 }
698
699 pub fn shutdown(self) {
701 let _ = self.computation_shutdown_tx.send(());
702 for pool in self.worker_pools {
703 pool.shutdown();
704 }
705 self.feed_handle.abort();
706 self.gas_price_handle.abort();
707 }
708
709 pub fn into_parts(self) -> SolverParts {
711 SolverParts {
712 router: self.router,
713 worker_pools: self.worker_pools,
714 market_data: self.market_data,
715 derived_data: self.derived_data,
716 feed_handle: self.feed_handle,
717 gas_price_handle: self.gas_price_handle,
718 computation_handle: self.computation_handle,
719 computation_shutdown_tx: self.computation_shutdown_tx,
720 chain: self.chain,
721 router_address: self.router_address,
722 }
723 }
724}
725
726pub struct SolverParts {
730 router: WorkerPoolRouter,
732 worker_pools: Vec<WorkerPool>,
734 market_data: SharedMarketDataRef,
736 derived_data: SharedDerivedDataRef,
738 feed_handle: JoinHandle<()>,
740 gas_price_handle: JoinHandle<()>,
742 computation_handle: JoinHandle<()>,
744 computation_shutdown_tx: broadcast::Sender<()>,
746 chain: Chain,
748 router_address: Bytes,
750}
751
752impl SolverParts {
753 pub fn chain(&self) -> Chain {
755 self.chain
756 }
757
758 pub fn router_address(&self) -> &Bytes {
760 &self.router_address
761 }
762
763 pub fn worker_pools(&self) -> &[WorkerPool] {
765 &self.worker_pools
766 }
767
768 pub fn market_data(&self) -> &SharedMarketDataRef {
770 &self.market_data
771 }
772
773 pub fn derived_data(&self) -> &SharedDerivedDataRef {
775 &self.derived_data
776 }
777
778 pub fn into_router(self) -> WorkerPoolRouter {
780 self.router
781 }
782
783 #[allow(clippy::type_complexity)]
785 pub fn into_components(
786 self,
787 ) -> (
788 WorkerPoolRouter,
789 Vec<WorkerPool>,
790 SharedMarketDataRef,
791 SharedDerivedDataRef,
792 JoinHandle<()>,
793 JoinHandle<()>,
794 JoinHandle<()>,
795 broadcast::Sender<()>,
796 ) {
797 (
798 self.router,
799 self.worker_pools,
800 self.market_data,
801 self.derived_data,
802 self.feed_handle,
803 self.gas_price_handle,
804 self.computation_handle,
805 self.computation_shutdown_tx,
806 )
807 }
808}