1use std::{collections::HashSet, sync::Arc, time::Duration};
14
15use num_cpus;
16use serde::{Deserialize, Serialize};
17use tokio::{sync::broadcast, task::JoinHandle};
18use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
19use tycho_simulation::{
20 tycho_common::{models::Chain, Bytes},
21 tycho_ethereum::rpc::EthereumRpcClient,
22};
23
24use crate::{
25 algorithm::{AlgorithmConfig, AlgorithmError},
26 derived::{ComputationManager, ComputationManagerConfig, SharedDerivedDataRef},
27 encoding::encoder::Encoder,
28 feed::{
29 events::MarketEventHandler,
30 gas::GasPriceFetcher,
31 market_data::{SharedMarketData, SharedMarketDataRef},
32 tycho_feed::TychoFeed,
33 TychoFeedConfig,
34 },
35 graph::EdgeWeightUpdaterWithDerived,
36 types::constants::native_token,
37 worker_pool::{
38 pool::{WorkerPool, WorkerPoolBuilder},
39 registry::UnknownAlgorithmError,
40 },
41 worker_pool_router::{config::WorkerPoolRouterConfig, SolverPoolHandle, WorkerPoolRouter},
42 Algorithm, Quote, QuoteRequest, SolveError,
43};
44
45pub mod defaults {
51 use std::time::Duration;
52
53 pub const MIN_TOKEN_QUALITY: i32 = 100;
55 pub const TRADED_N_DAYS_AGO: u64 = 3;
57 pub const TVL_BUFFER_RATIO: f64 = 1.1;
59 pub const GAS_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
61 pub const RECONNECT_DELAY: Duration = Duration::from_secs(5);
63 pub const ROUTER_MIN_RESPONSES: usize = 0;
66 pub const POOL_TASK_QUEUE_CAPACITY: usize = 1000;
68 pub const POOL_MIN_HOPS: usize = 1;
70 pub const POOL_MAX_HOPS: usize = 3;
72 pub const POOL_TIMEOUT_MS: u64 = 100;
74}
75
76const DEFAULT_TYCHO_USE_TLS: bool = true;
78const DEFAULT_DEPTH_SLIPPAGE_THRESHOLD: f64 = 0.01;
79const DEFAULT_ROUTER_TIMEOUT: Duration = Duration::from_secs(10);
82
83fn default_task_queue_capacity() -> usize {
86 defaults::POOL_TASK_QUEUE_CAPACITY
87}
88
89fn default_min_hops() -> usize {
90 defaults::POOL_MIN_HOPS
91}
92
93fn default_max_hops() -> usize {
94 defaults::POOL_MAX_HOPS
95}
96
97fn default_algo_timeout_ms() -> u64 {
98 defaults::POOL_TIMEOUT_MS
99}
100
101#[must_use]
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct PoolConfig {
105 algorithm: String,
107 #[serde(default = "num_cpus::get")]
109 num_workers: usize,
110 #[serde(default = "default_task_queue_capacity")]
112 task_queue_capacity: usize,
113 #[serde(default = "default_min_hops")]
115 min_hops: usize,
116 #[serde(default = "default_max_hops")]
118 max_hops: usize,
119 #[serde(default = "default_algo_timeout_ms")]
121 timeout_ms: u64,
122 #[serde(default)]
124 max_routes: Option<usize>,
125}
126
127impl PoolConfig {
128 pub fn new(algorithm: impl Into<String>) -> Self {
130 Self {
131 algorithm: algorithm.into(),
132 num_workers: num_cpus::get(),
133 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
134 min_hops: defaults::POOL_MIN_HOPS,
135 max_hops: defaults::POOL_MAX_HOPS,
136 timeout_ms: defaults::POOL_TIMEOUT_MS,
137 max_routes: None,
138 }
139 }
140
141 pub fn algorithm(&self) -> &str {
143 &self.algorithm
144 }
145
146 pub fn num_workers(&self) -> usize {
148 self.num_workers
149 }
150
151 pub fn with_num_workers(mut self, num_workers: usize) -> Self {
153 self.num_workers = num_workers;
154 self
155 }
156
157 pub fn with_task_queue_capacity(mut self, task_queue_capacity: usize) -> Self {
159 self.task_queue_capacity = task_queue_capacity;
160 self
161 }
162
163 pub fn with_min_hops(mut self, min_hops: usize) -> Self {
165 self.min_hops = min_hops;
166 self
167 }
168
169 pub fn with_max_hops(mut self, max_hops: usize) -> Self {
171 self.max_hops = max_hops;
172 self
173 }
174
175 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
177 self.timeout_ms = timeout_ms;
178 self
179 }
180
181 pub fn with_max_routes(mut self, max_routes: Option<usize>) -> Self {
183 self.max_routes = max_routes;
184 self
185 }
186
187 pub fn task_queue_capacity(&self) -> usize {
189 self.task_queue_capacity
190 }
191
192 pub fn min_hops(&self) -> usize {
194 self.min_hops
195 }
196
197 pub fn max_hops(&self) -> usize {
199 self.max_hops
200 }
201
202 pub fn timeout_ms(&self) -> u64 {
204 self.timeout_ms
205 }
206
207 pub fn max_routes(&self) -> Option<usize> {
209 self.max_routes
210 }
211}
212
213#[derive(Debug, thiserror::Error)]
215#[error("timed out after {timeout_ms}ms waiting for market data and derived computations")]
216pub struct WaitReadyError {
217 timeout_ms: u64,
218}
219
220#[non_exhaustive]
222#[derive(Debug, thiserror::Error)]
223pub enum SolverBuildError {
224 #[error("failed to create ethereum RPC client: {0}")]
226 RpcClient(String),
227 #[error(transparent)]
229 AlgorithmConfig(#[from] AlgorithmError),
230 #[error("failed to create computation manager: {0}")]
232 ComputationManager(String),
233 #[error("failed to create encoder: {0}")]
235 Encoder(String),
236 #[error(transparent)]
238 UnknownAlgorithm(#[from] UnknownAlgorithmError),
239 #[error("gas token not configured for chain")]
241 GasToken,
242 #[error("no worker pools configured")]
244 NoPools,
245}
246
247enum PoolEntry {
249 BuiltIn {
250 name: String,
251 algorithm: String,
252 num_workers: usize,
253 task_queue_capacity: usize,
254 min_hops: usize,
255 max_hops: usize,
256 timeout_ms: u64,
257 max_routes: Option<usize>,
258 },
259 Custom(CustomPoolEntry),
260}
261
262struct CustomPoolEntry {
264 name: String,
265 num_workers: usize,
266 task_queue_capacity: usize,
267 min_hops: usize,
268 max_hops: usize,
269 timeout_ms: u64,
270 max_routes: Option<usize>,
271 configure: Box<dyn FnOnce(WorkerPoolBuilder) -> WorkerPoolBuilder + Send>,
273}
274
275#[must_use = "a builder does nothing until .build() is called"]
277pub struct FyndBuilder {
280 chain: Chain,
281 tycho_url: String,
282 rpc_url: String,
283 protocols: Vec<String>,
284 min_tvl: f64,
285 tycho_api_key: Option<String>,
286 tycho_use_tls: bool,
287 min_token_quality: i32,
288 traded_n_days_ago: u64,
289 tvl_buffer_ratio: f64,
290 gas_refresh_interval: Duration,
291 reconnect_delay: Duration,
292 blacklisted_components: HashSet<String>,
293 router_timeout: Duration,
294 router_min_responses: usize,
295 encoder: Option<Encoder>,
296 pools: Vec<PoolEntry>,
297}
298
299impl FyndBuilder {
300 pub fn new(
302 chain: Chain,
303 tycho_url: impl Into<String>,
304 rpc_url: impl Into<String>,
305 protocols: Vec<String>,
306 min_tvl: f64,
307 ) -> Self {
308 Self {
309 chain,
310 tycho_url: tycho_url.into(),
311 rpc_url: rpc_url.into(),
312 protocols,
313 min_tvl,
314 tycho_api_key: None,
315 tycho_use_tls: DEFAULT_TYCHO_USE_TLS,
316 min_token_quality: defaults::MIN_TOKEN_QUALITY,
317 traded_n_days_ago: defaults::TRADED_N_DAYS_AGO,
318 tvl_buffer_ratio: defaults::TVL_BUFFER_RATIO,
319 gas_refresh_interval: defaults::GAS_REFRESH_INTERVAL,
320 reconnect_delay: defaults::RECONNECT_DELAY,
321 blacklisted_components: HashSet::new(),
322 router_timeout: DEFAULT_ROUTER_TIMEOUT,
323 router_min_responses: defaults::ROUTER_MIN_RESPONSES,
324 encoder: None,
325 pools: Vec::new(),
326 }
327 }
328
329 pub fn chain(&self) -> Chain {
331 self.chain
332 }
333
334 pub fn tycho_api_key(mut self, key: impl Into<String>) -> Self {
336 self.tycho_api_key = Some(key.into());
337 self
338 }
339
340 pub fn min_tvl(mut self, min_tvl: f64) -> Self {
342 self.min_tvl = min_tvl;
343 self
344 }
345
346 pub fn tycho_use_tls(mut self, use_tls: bool) -> Self {
348 self.tycho_use_tls = use_tls;
349 self
350 }
351
352 pub fn min_token_quality(mut self, quality: i32) -> Self {
355 self.min_token_quality = quality;
356 self
357 }
358
359 pub fn traded_n_days_ago(mut self, days: u64) -> Self {
361 self.traded_n_days_ago = days;
362 self
363 }
364
365 pub fn tvl_buffer_ratio(mut self, ratio: f64) -> Self {
367 self.tvl_buffer_ratio = ratio;
368 self
369 }
370
371 pub fn gas_refresh_interval(mut self, interval: Duration) -> Self {
373 self.gas_refresh_interval = interval;
374 self
375 }
376
377 pub fn reconnect_delay(mut self, delay: Duration) -> Self {
379 self.reconnect_delay = delay;
380 self
381 }
382
383 pub fn blacklisted_components(mut self, components: HashSet<String>) -> Self {
385 self.blacklisted_components = components;
386 self
387 }
388
389 pub fn worker_router_timeout(mut self, timeout: Duration) -> Self {
391 self.router_timeout = timeout;
392 self
393 }
394
395 pub fn worker_router_min_responses(mut self, min: usize) -> Self {
397 self.router_min_responses = min;
398 self
399 }
400
401 pub fn encoder(mut self, encoder: Encoder) -> Self {
403 self.encoder = Some(encoder);
404 self
405 }
406
407 pub fn algorithm(mut self, algorithm: impl Into<String>) -> Self {
409 self.pools.push(PoolEntry::BuiltIn {
410 name: "default".to_string(),
411 algorithm: algorithm.into(),
412 num_workers: num_cpus::get(),
413 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
414 min_hops: defaults::POOL_MIN_HOPS,
415 max_hops: defaults::POOL_MAX_HOPS,
416 timeout_ms: defaults::POOL_TIMEOUT_MS,
417 max_routes: None,
418 });
419 self
420 }
421
422 pub fn with_algorithm<A, F>(mut self, name: impl Into<String>, factory: F) -> Self
426 where
427 A: Algorithm + 'static,
428 A::GraphManager: MarketEventHandler + EdgeWeightUpdaterWithDerived + 'static,
429 F: Fn(AlgorithmConfig) -> A + Clone + Send + Sync + 'static,
430 {
431 let name = name.into();
432 let algo_name = name.clone();
433 let configure =
434 Box::new(move |builder: WorkerPoolBuilder| builder.with_algorithm(algo_name, factory));
435 self.pools
436 .push(PoolEntry::Custom(CustomPoolEntry {
437 name,
438 num_workers: num_cpus::get(),
439 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
440 min_hops: defaults::POOL_MIN_HOPS,
441 max_hops: defaults::POOL_MAX_HOPS,
442 timeout_ms: defaults::POOL_TIMEOUT_MS,
443 max_routes: None,
444 configure,
445 }));
446 self
447 }
448
449 pub fn add_pool(mut self, name: impl Into<String>, config: &PoolConfig) -> Self {
451 self.pools.push(PoolEntry::BuiltIn {
452 name: name.into(),
453 algorithm: config.algorithm().to_string(),
454 num_workers: config.num_workers(),
455 task_queue_capacity: config.task_queue_capacity(),
456 min_hops: config.min_hops(),
457 max_hops: config.max_hops(),
458 timeout_ms: config.timeout_ms(),
459 max_routes: config.max_routes(),
460 });
461 self
462 }
463
464 pub fn build(self) -> Result<Solver, SolverBuildError> {
470 if self.pools.is_empty() {
471 return Err(SolverBuildError::NoPools);
472 }
473
474 let market_data = Arc::new(tokio::sync::RwLock::new(SharedMarketData::new()));
475
476 let tycho_feed_config = TychoFeedConfig::new(
477 self.tycho_url,
478 self.chain,
479 self.tycho_api_key,
480 self.tycho_use_tls,
481 self.protocols,
482 self.min_tvl,
483 )
484 .tvl_buffer_ratio(self.tvl_buffer_ratio)
485 .gas_refresh_interval(self.gas_refresh_interval)
486 .reconnect_delay(self.reconnect_delay)
487 .min_token_quality(self.min_token_quality)
488 .traded_n_days_ago(self.traded_n_days_ago)
489 .blacklisted_components(self.blacklisted_components);
490
491 let ethereum_client = EthereumRpcClient::new(self.rpc_url.as_str())
492 .map_err(|e| SolverBuildError::RpcClient(e.to_string()))?;
493
494 let (mut gas_price_fetcher, gas_price_worker_signal_tx) =
495 GasPriceFetcher::new(ethereum_client, Arc::clone(&market_data));
496
497 let mut tycho_feed = TychoFeed::new(tycho_feed_config, Arc::clone(&market_data));
498 tycho_feed = tycho_feed.with_gas_price_worker_signal_tx(gas_price_worker_signal_tx);
499
500 let gas_token = native_token(&self.chain).map_err(|_| SolverBuildError::GasToken)?;
501 let computation_config = ComputationManagerConfig::new()
502 .with_gas_token(gas_token)
503 .with_depth_slippage_threshold(DEFAULT_DEPTH_SLIPPAGE_THRESHOLD);
504 let (computation_manager, _) =
507 ComputationManager::new(computation_config, Arc::clone(&market_data))
508 .map_err(|e| SolverBuildError::ComputationManager(e.to_string()))?;
509
510 let derived_data: SharedDerivedDataRef = computation_manager.store();
511 let derived_event_tx = computation_manager.event_sender();
512
513 let computation_event_rx = tycho_feed.subscribe();
515 let (computation_shutdown_tx, computation_shutdown_rx) = broadcast::channel(1);
516
517 let mut solver_pool_handles: Vec<SolverPoolHandle> = Vec::new();
518 let mut worker_pools: Vec<WorkerPool> = Vec::new();
519
520 for pool_entry in self.pools {
521 let pool_event_rx = tycho_feed.subscribe();
522 let derived_rx = derived_event_tx.subscribe();
523
524 let (worker_pool, task_handle) = match pool_entry {
525 PoolEntry::BuiltIn {
526 name,
527 algorithm,
528 num_workers,
529 task_queue_capacity,
530 min_hops,
531 max_hops,
532 timeout_ms,
533 max_routes,
534 } => {
535 let algo_cfg = AlgorithmConfig::new(
536 min_hops,
537 max_hops,
538 Duration::from_millis(timeout_ms),
539 max_routes,
540 )?;
541 WorkerPoolBuilder::new()
542 .name(name)
543 .algorithm(algorithm)
544 .algorithm_config(algo_cfg)
545 .num_workers(num_workers)
546 .task_queue_capacity(task_queue_capacity)
547 .build(
548 Arc::clone(&market_data),
549 Arc::clone(&derived_data),
550 pool_event_rx,
551 derived_rx,
552 )?
553 }
554 PoolEntry::Custom(custom) => {
555 let algo_cfg = AlgorithmConfig::new(
556 custom.min_hops,
557 custom.max_hops,
558 Duration::from_millis(custom.timeout_ms),
559 custom.max_routes,
560 )?;
561 let builder = WorkerPoolBuilder::new()
562 .name(custom.name)
563 .algorithm_config(algo_cfg)
564 .num_workers(custom.num_workers)
565 .task_queue_capacity(custom.task_queue_capacity);
566 let builder = (custom.configure)(builder);
567 builder.build(
568 Arc::clone(&market_data),
569 Arc::clone(&derived_data),
570 pool_event_rx,
571 derived_rx,
572 )?
573 }
574 };
575
576 solver_pool_handles.push(SolverPoolHandle::new(worker_pool.name(), task_handle));
577 worker_pools.push(worker_pool);
578 }
579
580 let encoder = match self.encoder {
581 Some(enc) => enc,
582 None => {
583 let registry = SwapEncoderRegistry::new(self.chain)
584 .add_default_encoders(None)
585 .map_err(|e| SolverBuildError::Encoder(e.to_string()))?;
586 Encoder::new(self.chain, registry)
587 .map_err(|e| SolverBuildError::Encoder(e.to_string()))?
588 }
589 };
590
591 let chain = self.chain;
592 let router_address = encoder.router_address().clone();
593
594 let router_config = WorkerPoolRouterConfig::default()
595 .with_timeout(self.router_timeout)
596 .with_min_responses(self.router_min_responses);
597 let router = WorkerPoolRouter::new(solver_pool_handles, router_config, encoder);
598
599 let feed_handle = tokio::spawn(async move {
600 if let Err(e) = tycho_feed.run().await {
601 tracing::error!(error = %e, "tycho feed error");
602 }
603 });
604
605 let gas_price_handle = tokio::spawn(async move {
606 if let Err(e) = gas_price_fetcher.run().await {
607 tracing::error!(error = %e, "gas price fetcher error");
608 }
609 });
610
611 let computation_handle = tokio::spawn(async move {
612 computation_manager
613 .run(computation_event_rx, computation_shutdown_rx)
614 .await;
615 });
616
617 Ok(Solver {
618 router,
619 worker_pools,
620 market_data,
621 derived_data,
622 feed_handle,
623 gas_price_handle,
624 computation_handle,
625 computation_shutdown_tx,
626 chain,
627 router_address,
628 })
629 }
630}
631
632pub struct Solver {
634 router: WorkerPoolRouter,
635 worker_pools: Vec<WorkerPool>,
636 market_data: SharedMarketDataRef,
637 derived_data: SharedDerivedDataRef,
638 feed_handle: JoinHandle<()>,
639 gas_price_handle: JoinHandle<()>,
640 computation_handle: JoinHandle<()>,
641 computation_shutdown_tx: broadcast::Sender<()>,
642 chain: Chain,
643 router_address: Bytes,
644}
645
646impl Solver {
647 pub fn market_data(&self) -> SharedMarketDataRef {
649 Arc::clone(&self.market_data)
650 }
651
652 pub fn derived_data(&self) -> SharedDerivedDataRef {
654 Arc::clone(&self.derived_data)
655 }
656
657 pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
663 self.router.quote(request).await
664 }
665
666 pub async fn wait_until_ready(&self, timeout: Duration) -> Result<(), WaitReadyError> {
682 const POLL_INTERVAL: Duration = Duration::from_millis(500);
683
684 let deadline = tokio::time::Instant::now() + timeout;
685
686 loop {
687 let market_ready = self
688 .market_data
689 .read()
690 .await
691 .last_updated()
692 .is_some();
693 let derived_ready = self
694 .derived_data
695 .read()
696 .await
697 .derived_data_ready();
698
699 if market_ready && derived_ready {
700 return Ok(());
701 }
702
703 if tokio::time::Instant::now() >= deadline {
704 return Err(WaitReadyError { timeout_ms: timeout.as_millis() as u64 });
705 }
706
707 tokio::time::sleep(POLL_INTERVAL).await;
708 }
709 }
710
711 pub fn shutdown(self) {
713 let _ = self.computation_shutdown_tx.send(());
714 for pool in self.worker_pools {
715 pool.shutdown();
716 }
717 self.feed_handle.abort();
718 self.gas_price_handle.abort();
719 }
720
721 pub fn into_parts(self) -> SolverParts {
723 SolverParts {
724 router: self.router,
725 worker_pools: self.worker_pools,
726 market_data: self.market_data,
727 derived_data: self.derived_data,
728 feed_handle: self.feed_handle,
729 gas_price_handle: self.gas_price_handle,
730 computation_handle: self.computation_handle,
731 computation_shutdown_tx: self.computation_shutdown_tx,
732 chain: self.chain,
733 router_address: self.router_address,
734 }
735 }
736}
737
738pub struct SolverParts {
742 router: WorkerPoolRouter,
744 worker_pools: Vec<WorkerPool>,
746 market_data: SharedMarketDataRef,
748 derived_data: SharedDerivedDataRef,
750 feed_handle: JoinHandle<()>,
752 gas_price_handle: JoinHandle<()>,
754 computation_handle: JoinHandle<()>,
756 computation_shutdown_tx: broadcast::Sender<()>,
758 chain: Chain,
760 router_address: Bytes,
762}
763
764impl SolverParts {
765 pub fn chain(&self) -> Chain {
767 self.chain
768 }
769
770 pub fn router_address(&self) -> &Bytes {
772 &self.router_address
773 }
774
775 pub fn worker_pools(&self) -> &[WorkerPool] {
777 &self.worker_pools
778 }
779
780 pub fn market_data(&self) -> &SharedMarketDataRef {
782 &self.market_data
783 }
784
785 pub fn derived_data(&self) -> &SharedDerivedDataRef {
787 &self.derived_data
788 }
789
790 pub fn into_router(self) -> WorkerPoolRouter {
792 self.router
793 }
794
795 #[allow(clippy::type_complexity)]
797 pub fn into_components(
798 self,
799 ) -> (
800 WorkerPoolRouter,
801 Vec<WorkerPool>,
802 SharedMarketDataRef,
803 SharedDerivedDataRef,
804 JoinHandle<()>,
805 JoinHandle<()>,
806 JoinHandle<()>,
807 broadcast::Sender<()>,
808 ) {
809 (
810 self.router,
811 self.worker_pools,
812 self.market_data,
813 self.derived_data,
814 self.feed_handle,
815 self.gas_price_handle,
816 self.computation_handle,
817 self.computation_shutdown_tx,
818 )
819 }
820}