1use std::{collections::HashSet, str::FromStr, sync::Arc, time::Duration};
14
15use num_cpus;
16use serde::{Deserialize, Serialize};
17use tokio::{sync::broadcast, task::JoinHandle};
18use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
19#[cfg(feature = "test-utils")]
20use tycho_simulation::tycho_ethereum::gas::{BlockGasPrice, GasPrice};
21use tycho_simulation::{
22 evm::pending::PendingBlockProcessor,
23 tycho_common::{models::Chain, traits::TxDeltaIndexer, Bytes},
24 tycho_core::models::Address,
25 tycho_ethereum::rpc::EthereumRpcClient,
26};
27
28use crate::{
29 algorithm::{AlgorithmConfig, AlgorithmError},
30 derived::{ComputationManager, ComputationManagerConfig, SharedDerivedDataRef},
31 encoding::{encoder::Encoder, fee_fetcher::RouterFeeFetcher, router_fees::SharedRouterFees},
32 feed::{
33 events::{MarketEvent, MarketEventHandler},
34 gas::GasPriceFetcher,
35 market_data::MarketData,
36 tycho_feed::TychoFeed,
37 TychoFeedConfig,
38 },
39 graph::EdgeWeightUpdaterWithDerived,
40 price_guard::{
41 guard::PriceGuard, provider::PriceProvider, provider_registry::PriceProviderRegistry,
42 },
43 types::constants::native_token,
44 worker_pool::{
45 pool::{WorkerPool, WorkerPoolBuilder},
46 registry::UnknownAlgorithmError,
47 },
48 worker_pool_router::{config::WorkerPoolRouterConfig, SolverPoolHandle, WorkerPoolRouter},
49 Algorithm, Quote, QuoteRequest, SolveError,
50};
51
52pub mod defaults {
58 use std::time::Duration;
59
60 pub const MIN_TOKEN_QUALITY: i32 = 100;
62 pub const TRADED_N_DAYS_AGO: u64 = 3;
64 pub const TVL_BUFFER_RATIO: f64 = 1.1;
66 pub const GAS_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
68 pub const ROUTER_FEE_REFRESH_INTERVAL: Duration = Duration::from_secs(300);
70 pub const RECONNECT_DELAY: Duration = Duration::from_secs(5);
72 pub const ROUTER_MIN_RESPONSES: usize = 0;
75 pub const POOL_TASK_QUEUE_CAPACITY: usize = 1000;
77 pub const POOL_MIN_HOPS: usize = 1;
79 pub const POOL_MAX_HOPS: usize = 3;
81 pub const POOL_TIMEOUT_MS: u64 = 100;
83}
84
85const DEFAULT_TYCHO_USE_TLS: bool = true;
87const DEFAULT_DEPTH_SLIPPAGE_THRESHOLD: f64 = 0.01;
88const DEFAULT_ROUTER_TIMEOUT: Duration = Duration::from_secs(10);
91
92fn default_task_queue_capacity() -> usize {
95 defaults::POOL_TASK_QUEUE_CAPACITY
96}
97
98fn default_min_hops() -> usize {
99 defaults::POOL_MIN_HOPS
100}
101
102fn default_max_hops() -> usize {
103 defaults::POOL_MAX_HOPS
104}
105
106fn default_algo_timeout_ms() -> u64 {
107 defaults::POOL_TIMEOUT_MS
108}
109
110fn parse_connector_tokens(
111 raw: Option<&[String]>,
112) -> Result<Option<HashSet<Address>>, SolverBuildError> {
113 let Some(strings) = raw else {
114 return Ok(None);
115 };
116 let mut set = HashSet::with_capacity(strings.len());
117 for s in strings {
118 let addr = Address::from_str(s).map_err(|e| AlgorithmError::InvalidConfiguration {
119 reason: format!("connector_tokens: invalid address {s:?}: {e}"),
120 })?;
121 set.insert(addr);
122 }
123 Ok(Some(set))
124}
125
126#[must_use]
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct PoolConfig {
130 algorithm: String,
132 #[serde(default = "num_cpus::get")]
134 num_workers: usize,
135 #[serde(default = "default_task_queue_capacity")]
137 task_queue_capacity: usize,
138 #[serde(default = "default_min_hops")]
140 min_hops: usize,
141 #[serde(default = "default_max_hops")]
143 max_hops: usize,
144 #[serde(default = "default_algo_timeout_ms")]
146 timeout_ms: u64,
147 #[serde(default)]
149 max_routes: Option<usize>,
150 #[serde(default)]
153 connector_tokens: Option<Vec<String>>,
154}
155
156impl PoolConfig {
157 pub fn new(algorithm: impl Into<String>) -> Self {
159 Self {
160 algorithm: algorithm.into(),
161 num_workers: num_cpus::get(),
162 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
163 min_hops: defaults::POOL_MIN_HOPS,
164 max_hops: defaults::POOL_MAX_HOPS,
165 timeout_ms: defaults::POOL_TIMEOUT_MS,
166 max_routes: None,
167 connector_tokens: None,
168 }
169 }
170
171 pub fn algorithm(&self) -> &str {
173 &self.algorithm
174 }
175
176 pub fn num_workers(&self) -> usize {
178 self.num_workers
179 }
180
181 pub fn with_num_workers(mut self, num_workers: usize) -> Self {
183 self.num_workers = num_workers;
184 self
185 }
186
187 pub fn with_task_queue_capacity(mut self, task_queue_capacity: usize) -> Self {
189 self.task_queue_capacity = task_queue_capacity;
190 self
191 }
192
193 pub fn with_min_hops(mut self, min_hops: usize) -> Self {
195 self.min_hops = min_hops;
196 self
197 }
198
199 pub fn with_max_hops(mut self, max_hops: usize) -> Self {
201 self.max_hops = max_hops;
202 self
203 }
204
205 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
207 self.timeout_ms = timeout_ms;
208 self
209 }
210
211 pub fn with_max_routes(mut self, max_routes: Option<usize>) -> Self {
213 self.max_routes = max_routes;
214 self
215 }
216
217 pub fn task_queue_capacity(&self) -> usize {
219 self.task_queue_capacity
220 }
221
222 pub fn min_hops(&self) -> usize {
224 self.min_hops
225 }
226
227 pub fn max_hops(&self) -> usize {
229 self.max_hops
230 }
231
232 pub fn timeout_ms(&self) -> u64 {
234 self.timeout_ms
235 }
236
237 pub fn max_routes(&self) -> Option<usize> {
239 self.max_routes
240 }
241
242 pub fn with_connector_tokens(mut self, tokens: Vec<String>) -> Self {
245 self.connector_tokens = Some(tokens);
246 self
247 }
248
249 pub fn connector_tokens(&self) -> Option<&[String]> {
251 self.connector_tokens.as_deref()
252 }
253}
254
255#[derive(Debug, thiserror::Error)]
257#[error("timed out after {timeout_ms}ms waiting for market data and derived computations")]
258pub struct WaitReadyError {
259 timeout_ms: u64,
260}
261
262#[non_exhaustive]
264#[derive(Debug, thiserror::Error)]
265pub enum SolverBuildError {
266 #[error("failed to create ethereum RPC client: {0}")]
268 RpcClient(String),
269 #[error(transparent)]
271 AlgorithmConfig(#[from] AlgorithmError),
272 #[error("failed to create computation manager: {0}")]
274 ComputationManager(String),
275 #[error("failed to create encoder: {0}")]
277 Encoder(String),
278 #[error("failed to create router fee fetcher: {0}")]
280 RouterFeeFetcher(String),
281 #[error(transparent)]
283 UnknownAlgorithm(#[from] UnknownAlgorithmError),
284 #[error("gas token not configured for chain")]
286 GasToken,
287 #[error("no worker pools configured")]
289 NoPools,
290 #[cfg(feature = "test-utils")]
292 #[error("replay failed: {0}")]
293 Replay(String),
294 #[error("feed setup failed before delivering pending processor: {0}")]
299 FeedSetup(String),
300 #[error("pending processor channel closed before processor was delivered")]
303 PendingChannelClosed,
304}
305
306enum PoolEntry {
308 BuiltIn {
309 name: String,
310 algorithm: String,
311 num_workers: usize,
312 task_queue_capacity: usize,
313 min_hops: usize,
314 max_hops: usize,
315 timeout_ms: u64,
316 max_routes: Option<usize>,
317 connector_tokens: Option<HashSet<Address>>,
318 },
319 Custom(CustomPoolEntry),
320}
321
322struct CustomPoolEntry {
324 name: String,
325 num_workers: usize,
326 task_queue_capacity: usize,
327 min_hops: usize,
328 max_hops: usize,
329 timeout_ms: u64,
330 max_routes: Option<usize>,
331 configure: Box<dyn FnOnce(WorkerPoolBuilder) -> WorkerPoolBuilder + Send>,
333}
334
335struct BuiltComponents {
338 tycho_feed: TychoFeed,
339 gas_price_fetcher: GasPriceFetcher<EthereumRpcClient>,
340 router_fee_fetcher: RouterFeeFetcher,
341 computation_manager: ComputationManager,
342 computation_event_rx: broadcast::Receiver<MarketEvent>,
343 computation_shutdown_tx: broadcast::Sender<()>,
344 computation_shutdown_rx: broadcast::Receiver<()>,
345 router: WorkerPoolRouter,
346 worker_pools: Vec<WorkerPool>,
347 market_data: MarketData,
348 derived_data: SharedDerivedDataRef,
349 router_fees: SharedRouterFees,
350 chain: Chain,
351 router_address: Bytes,
352 pending_indexers: Vec<(String, Box<dyn TxDeltaIndexer>)>,
353 market_event_tx: broadcast::Sender<MarketEvent>,
354}
355
356#[must_use = "a builder does nothing until .build() is called"]
361pub struct FyndBuilder {
362 chain: Chain,
363 tycho_url: String,
364 rpc_url: String,
365 protocols: Vec<String>,
366 min_tvl: f64,
367 tycho_api_key: Option<String>,
368 tycho_use_tls: bool,
369 min_token_quality: i32,
370 traded_n_days_ago: u64,
371 tvl_buffer_ratio: f64,
372 gas_refresh_interval: Duration,
373 reconnect_delay: Duration,
374 blocklisted_components: HashSet<String>,
375 partial_blocks: bool,
376 router_timeout: Duration,
377 router_min_responses: usize,
378 encoder: Option<Encoder>,
379 pools: Vec<PoolEntry>,
380 price_guard_enabled: bool,
381 price_providers: Vec<Box<dyn PriceProvider>>,
382 pending_indexers: Vec<(String, Box<dyn TxDeltaIndexer>)>,
383}
384
385impl FyndBuilder {
386 pub fn new(
388 chain: Chain,
389 tycho_url: impl Into<String>,
390 rpc_url: impl Into<String>,
391 protocols: Vec<String>,
392 min_tvl: f64,
393 ) -> Self {
394 Self {
395 chain,
396 tycho_url: tycho_url.into(),
397 rpc_url: rpc_url.into(),
398 protocols,
399 min_tvl,
400 tycho_api_key: None,
401 tycho_use_tls: DEFAULT_TYCHO_USE_TLS,
402 min_token_quality: defaults::MIN_TOKEN_QUALITY,
403 traded_n_days_ago: defaults::TRADED_N_DAYS_AGO,
404 tvl_buffer_ratio: defaults::TVL_BUFFER_RATIO,
405 gas_refresh_interval: defaults::GAS_REFRESH_INTERVAL,
406 reconnect_delay: defaults::RECONNECT_DELAY,
407 blocklisted_components: HashSet::new(),
408 partial_blocks: false,
409 router_timeout: DEFAULT_ROUTER_TIMEOUT,
410 router_min_responses: defaults::ROUTER_MIN_RESPONSES,
411 encoder: None,
412 pools: Vec::new(),
413 price_guard_enabled: false,
414 price_providers: Vec::new(),
415 pending_indexers: Vec::new(),
416 }
417 }
418
419 pub fn chain(&self) -> Chain {
421 self.chain
422 }
423
424 pub fn tycho_api_key(mut self, key: impl Into<String>) -> Self {
426 self.tycho_api_key = Some(key.into());
427 self
428 }
429
430 pub fn min_tvl(mut self, min_tvl: f64) -> Self {
432 self.min_tvl = min_tvl;
433 self
434 }
435
436 pub fn tycho_use_tls(mut self, use_tls: bool) -> Self {
438 self.tycho_use_tls = use_tls;
439 self
440 }
441
442 pub fn min_token_quality(mut self, quality: i32) -> Self {
445 self.min_token_quality = quality;
446 self
447 }
448
449 pub fn traded_n_days_ago(mut self, days: u64) -> Self {
451 self.traded_n_days_ago = days;
452 self
453 }
454
455 pub fn tvl_buffer_ratio(mut self, ratio: f64) -> Self {
457 self.tvl_buffer_ratio = ratio;
458 self
459 }
460
461 pub fn gas_refresh_interval(mut self, interval: Duration) -> Self {
463 self.gas_refresh_interval = interval;
464 self
465 }
466
467 pub fn reconnect_delay(mut self, delay: Duration) -> Self {
469 self.reconnect_delay = delay;
470 self
471 }
472
473 pub fn blocklisted_components(mut self, components: HashSet<String>) -> Self {
475 self.blocklisted_components = components;
476 self
477 }
478
479 pub fn partial_blocks(mut self, enabled: bool) -> Self {
485 self.partial_blocks = enabled;
486 self
487 }
488
489 pub fn worker_router_timeout(mut self, timeout: Duration) -> Self {
491 self.router_timeout = timeout;
492 self
493 }
494
495 pub fn worker_router_min_responses(mut self, min: usize) -> Self {
497 self.router_min_responses = min;
498 self
499 }
500
501 pub fn encoder(mut self, encoder: Encoder) -> Self {
503 self.encoder = Some(encoder);
504 self
505 }
506
507 pub fn algorithm(mut self, algorithm: impl Into<String>) -> Self {
509 self.pools.push(PoolEntry::BuiltIn {
510 name: "default".to_string(),
511 algorithm: algorithm.into(),
512 num_workers: num_cpus::get(),
513 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
514 min_hops: defaults::POOL_MIN_HOPS,
515 max_hops: defaults::POOL_MAX_HOPS,
516 timeout_ms: defaults::POOL_TIMEOUT_MS,
517 max_routes: None,
518 connector_tokens: None,
519 });
520 self
521 }
522
523 pub fn with_algorithm<A, F>(mut self, name: impl Into<String>, factory: F) -> Self
527 where
528 A: Algorithm + 'static,
529 A::GraphManager: MarketEventHandler + EdgeWeightUpdaterWithDerived + 'static,
530 F: Fn(AlgorithmConfig) -> A + Clone + Send + Sync + 'static,
531 {
532 let name = name.into();
533 let algo_name = name.clone();
534 let configure =
535 Box::new(move |builder: WorkerPoolBuilder| builder.with_algorithm(algo_name, factory));
536 self.pools
537 .push(PoolEntry::Custom(CustomPoolEntry {
538 name,
539 num_workers: num_cpus::get(),
540 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
541 min_hops: defaults::POOL_MIN_HOPS,
542 max_hops: defaults::POOL_MAX_HOPS,
543 timeout_ms: defaults::POOL_TIMEOUT_MS,
544 max_routes: None,
545 configure,
546 }));
547 self
548 }
549
550 pub fn add_default_price_providers(self) -> Self {
557 self.register_price_provider(Box::new(
558 crate::price_guard::hyperliquid::HyperliquidProvider::default(),
559 ))
560 .register_price_provider(Box::new(
561 crate::price_guard::binance_ws::BinanceWsProvider::default(),
562 ))
563 }
564
565 pub fn register_price_provider(mut self, provider: Box<dyn PriceProvider>) -> Self {
570 self.price_providers.push(provider);
571 self
572 }
573
574 pub fn with_pending_indexer(
580 mut self,
581 extractor: impl Into<String>,
582 indexer: Box<dyn TxDeltaIndexer>,
583 ) -> Self {
584 self.pending_indexers
585 .push((extractor.into(), indexer));
586 self
587 }
588
589 pub fn price_guard_enabled(mut self, enabled: bool) -> Self {
596 self.price_guard_enabled = enabled;
597 self
598 }
599
600 pub fn add_pool(
607 mut self,
608 name: impl Into<String>,
609 config: &PoolConfig,
610 ) -> Result<Self, SolverBuildError> {
611 let connector_tokens = parse_connector_tokens(config.connector_tokens())?;
612 self.pools.push(PoolEntry::BuiltIn {
613 name: name.into(),
614 algorithm: config.algorithm().to_string(),
615 num_workers: config.num_workers(),
616 task_queue_capacity: config.task_queue_capacity(),
617 min_hops: config.min_hops(),
618 max_hops: config.max_hops(),
619 timeout_ms: config.timeout_ms(),
620 max_routes: config.max_routes(),
621 connector_tokens,
622 });
623 Ok(self)
624 }
625
626 fn assemble_components(mut self) -> Result<BuiltComponents, SolverBuildError> {
629 if self.pools.is_empty() {
630 return Err(SolverBuildError::NoPools);
631 }
632
633 if self.price_providers.is_empty() {
635 self = self.add_default_price_providers();
636 }
637
638 let market_data = MarketData::new_shared();
639
640 let tycho_feed_config = TychoFeedConfig::new(
641 self.tycho_url,
642 self.chain,
643 self.tycho_api_key,
644 self.tycho_use_tls,
645 self.protocols,
646 self.min_tvl,
647 )
648 .tvl_buffer_ratio(self.tvl_buffer_ratio)
649 .reconnect_delay(self.reconnect_delay)
650 .min_token_quality(self.min_token_quality)
651 .traded_n_days_ago(self.traded_n_days_ago)
652 .blocklisted_components(self.blocklisted_components)
653 .partial_blocks(self.partial_blocks);
654
655 let ethereum_client = EthereumRpcClient::new(self.rpc_url.as_str())
656 .map_err(|e| SolverBuildError::RpcClient(e.to_string()))?;
657
658 let gas_price_fetcher =
659 GasPriceFetcher::new(ethereum_client, market_data.clone(), self.gas_refresh_interval);
660
661 let tycho_feed = TychoFeed::new(tycho_feed_config, market_data.clone());
662 let market_event_tx = tycho_feed.event_sender();
663
664 let gas_token = native_token(&self.chain).map_err(|_| SolverBuildError::GasToken)?;
665 let computation_config = ComputationManagerConfig::new()
666 .with_gas_token(gas_token)
667 .with_depth_slippage_threshold(DEFAULT_DEPTH_SLIPPAGE_THRESHOLD);
668 let (computation_manager, _) =
671 ComputationManager::new(computation_config, market_data.clone())
672 .map_err(|e| SolverBuildError::ComputationManager(e.to_string()))?;
673
674 let derived_data: SharedDerivedDataRef = computation_manager.store();
675 let derived_event_tx = computation_manager.event_sender();
676
677 let computation_event_rx = tycho_feed.subscribe();
679 let (computation_shutdown_tx, computation_shutdown_rx) = broadcast::channel(1);
680
681 let mut solver_pool_handles: Vec<SolverPoolHandle> = Vec::new();
682 let mut worker_pools: Vec<WorkerPool> = Vec::new();
683
684 for pool_entry in self.pools {
685 let pool_event_rx = tycho_feed.subscribe();
686 let derived_rx = derived_event_tx.subscribe();
687
688 let (worker_pool, task_handle) = match pool_entry {
689 PoolEntry::BuiltIn {
690 name,
691 algorithm,
692 num_workers,
693 task_queue_capacity,
694 min_hops,
695 max_hops,
696 timeout_ms,
697 max_routes,
698 connector_tokens,
699 } => {
700 let mut algo_cfg = AlgorithmConfig::new(
701 min_hops,
702 max_hops,
703 Duration::from_millis(timeout_ms),
704 max_routes,
705 )?;
706 if let Some(tokens) = connector_tokens {
707 algo_cfg = algo_cfg.with_connector_tokens(tokens);
708 }
709 WorkerPoolBuilder::new()
710 .name(name)
711 .algorithm(algorithm)
712 .algorithm_config(algo_cfg)
713 .num_workers(num_workers)
714 .task_queue_capacity(task_queue_capacity)
715 .build(
716 market_data.clone(),
717 Arc::clone(&derived_data),
718 pool_event_rx,
719 derived_rx,
720 )?
721 }
722 PoolEntry::Custom(custom) => {
723 let algo_cfg = AlgorithmConfig::new(
724 custom.min_hops,
725 custom.max_hops,
726 Duration::from_millis(custom.timeout_ms),
727 custom.max_routes,
728 )?;
729 let builder = WorkerPoolBuilder::new()
730 .name(custom.name)
731 .algorithm_config(algo_cfg)
732 .num_workers(custom.num_workers)
733 .task_queue_capacity(custom.task_queue_capacity);
734 let builder = (custom.configure)(builder);
735 builder.build(
736 market_data.clone(),
737 Arc::clone(&derived_data),
738 pool_event_rx,
739 derived_rx,
740 )?
741 }
742 };
743
744 solver_pool_handles.push(SolverPoolHandle::new(worker_pool.name(), task_handle));
745 worker_pools.push(worker_pool);
746 }
747
748 let encoder = match self.encoder {
749 Some(enc) => enc,
750 None => {
751 let registry = SwapEncoderRegistry::new(self.chain)
752 .add_default_encoders(None)
753 .map_err(|e| SolverBuildError::Encoder(e.to_string()))?;
754 Encoder::new(self.chain, registry)
755 .map_err(|e| SolverBuildError::Encoder(e.to_string()))?
756 }
757 };
758
759 let chain = self.chain;
760 let router_address = encoder.router_address().clone();
761 let router_fees = encoder.router_fees();
762
763 let router_fee_fetcher = RouterFeeFetcher::new(
764 self.rpc_url.as_str(),
765 &router_address,
766 router_fees.clone(),
767 defaults::ROUTER_FEE_REFRESH_INTERVAL,
768 )
769 .map_err(|e| SolverBuildError::RouterFeeFetcher(e.to_string()))?;
770
771 let router_config = WorkerPoolRouterConfig::default()
774 .with_timeout(self.router_timeout)
775 .with_min_responses(self.router_min_responses);
776 let mut router = WorkerPoolRouter::new(solver_pool_handles, router_config, encoder);
777
778 if self.price_guard_enabled {
779 let mut registry = PriceProviderRegistry::new();
780 let mut worker_handles = Vec::new();
781 for mut provider in self.price_providers {
782 worker_handles.push(provider.start(market_data.clone()));
783 registry = registry.register(provider);
784 }
785 let price_guard = PriceGuard::new(registry, worker_handles);
786 router = router.with_price_guard(price_guard);
787 }
788
789 Ok(BuiltComponents {
790 tycho_feed,
791 gas_price_fetcher,
792 router_fee_fetcher,
793 computation_manager,
794 computation_event_rx,
795 computation_shutdown_tx,
796 computation_shutdown_rx,
797 router,
798 worker_pools,
799 market_data,
800 derived_data,
801 router_fees,
802 chain,
803 router_address,
804 pending_indexers: self.pending_indexers,
805 market_event_tx,
806 })
807 }
808
809 pub fn build(self) -> Result<Solver, SolverBuildError> {
815 let mut c = self.assemble_components()?;
816
817 let feed_handle = tokio::spawn(async move {
818 if let Err(e) = c.tycho_feed.run().await {
819 tracing::error!(error = %e, "tycho feed error");
820 }
821 });
822 let gas_price_handle = tokio::spawn(async move {
823 c.gas_price_fetcher.run().await;
824 });
825 let router_fee_handle = tokio::spawn(async move {
826 c.router_fee_fetcher.run().await;
827 });
828 let computation_handle = tokio::spawn(async move {
829 c.computation_manager
830 .run(c.computation_event_rx, c.computation_shutdown_rx)
831 .await;
832 });
833
834 Ok(Solver {
835 router: c.router,
836 worker_pools: c.worker_pools,
837 market_data: c.market_data,
838 derived_data: c.derived_data,
839 router_fees: c.router_fees,
840 feed_handle,
841 gas_price_handle,
842 router_fee_handle,
843 computation_handle,
844 computation_shutdown_tx: c.computation_shutdown_tx,
845 chain: c.chain,
846 router_address: c.router_address,
847 market_event_tx: c.market_event_tx,
848 })
849 }
850
851 pub async fn build_with_pending(
863 self,
864 ) -> Result<(Solver, PendingBlockProcessor), SolverBuildError> {
865 let mut c = self.assemble_components()?;
866
867 let (pending_tx, pending_rx) =
868 tokio::sync::oneshot::channel::<Result<PendingBlockProcessor, String>>();
869
870 let pending_indexers = c.pending_indexers;
871 let feed_handle = tokio::spawn(async move {
872 if let Err(e) = c
873 .tycho_feed
874 .run_with_pending(pending_tx, pending_indexers)
875 .await
876 {
877 tracing::error!(error = %e, "tycho feed error");
878 }
879 });
880 let gas_price_handle = tokio::spawn(async move {
881 c.gas_price_fetcher.run().await;
882 });
883 let router_fee_handle = tokio::spawn(async move {
884 c.router_fee_fetcher.run().await;
885 });
886 let computation_handle = tokio::spawn(async move {
887 c.computation_manager
888 .run(c.computation_event_rx, c.computation_shutdown_rx)
889 .await;
890 });
891
892 let pending = pending_rx
893 .await
894 .map_err(|_| SolverBuildError::PendingChannelClosed)?
895 .map_err(SolverBuildError::FeedSetup)?;
896
897 Ok((
898 Solver {
899 router: c.router,
900 worker_pools: c.worker_pools,
901 market_data: c.market_data,
902 derived_data: c.derived_data,
903 router_fees: c.router_fees,
904 feed_handle,
905 gas_price_handle,
906 router_fee_handle,
907 computation_handle,
908 computation_shutdown_tx: c.computation_shutdown_tx,
909 chain: c.chain,
910 router_address: c.router_address,
911 market_event_tx: c.market_event_tx,
912 },
913 pending,
914 ))
915 }
916}
917
918pub struct Solver {
920 router: WorkerPoolRouter,
921 worker_pools: Vec<WorkerPool>,
922 market_data: MarketData,
923 derived_data: SharedDerivedDataRef,
924 router_fees: SharedRouterFees,
925 feed_handle: JoinHandle<()>,
926 gas_price_handle: JoinHandle<()>,
927 router_fee_handle: JoinHandle<()>,
928 computation_handle: JoinHandle<()>,
929 computation_shutdown_tx: broadcast::Sender<()>,
930 chain: Chain,
931 router_address: Bytes,
932 market_event_tx: broadcast::Sender<MarketEvent>,
933}
934
935impl Solver {
936 pub fn market_data(&self) -> MarketData {
938 self.market_data.clone()
939 }
940
941 pub fn derived_data(&self) -> SharedDerivedDataRef {
943 Arc::clone(&self.derived_data)
944 }
945
946 pub fn subscribe_market_events(&self) -> broadcast::Receiver<crate::feed::events::MarketEvent> {
951 self.market_event_tx.subscribe()
952 }
953
954 pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
960 self.router.quote(request).await
961 }
962
963 pub async fn wait_until_ready(&self, timeout: Duration) -> Result<(), WaitReadyError> {
980 const POLL_INTERVAL: Duration = Duration::from_millis(500);
981
982 let deadline = tokio::time::Instant::now() + timeout;
983
984 loop {
985 let market_ready = self
986 .market_data
987 .read()
988 .await
989 .last_updated()
990 .is_some();
991 let derived_ready = self
992 .derived_data
993 .read()
994 .await
995 .derived_data_ready();
996
997 if market_ready && derived_ready {
998 return Ok(());
999 }
1000
1001 if tokio::time::Instant::now() >= deadline {
1002 return Err(WaitReadyError { timeout_ms: timeout.as_millis() as u64 });
1003 }
1004
1005 tokio::time::sleep(POLL_INTERVAL).await;
1006 }
1007 }
1008
1009 #[cfg(feature = "test-utils")]
1022 pub async fn from_recording(
1023 chain: Chain,
1024 updates: Vec<tycho_simulation::protocol::models::Update>,
1025 pools: std::collections::HashMap<String, PoolConfig>,
1026 gas_price_wei: Option<num_bigint::BigUint>,
1027 ) -> Result<Self, SolverBuildError> {
1028 if pools.is_empty() {
1029 return Err(SolverBuildError::NoPools);
1030 }
1031
1032 let market_data = MarketData::new_shared();
1033
1034 let feed_config =
1036 TychoFeedConfig::new("ws://replay".to_string(), chain, None, false, vec![], 0.0);
1037 let feed = TychoFeed::new(feed_config, market_data.clone());
1038 let market_event_tx = feed.event_sender();
1039 let _feed_rx = feed.subscribe();
1040
1041 for update in updates {
1042 feed.handle_tycho_message(update)
1043 .await
1044 .map_err(|e| SolverBuildError::Replay(e.to_string()))?;
1045 }
1046
1047 let gas_price = match gas_price_wei {
1049 Some(price) => price,
1050 None => {
1051 tracing::warn!("no recorded gas price, defaulting to 10 gwei");
1052 num_bigint::BigUint::from(10_000_000_000u64)
1053 }
1054 };
1055 let block_number = match market_data.read().await.last_updated() {
1056 Some(block) => block.number(),
1057 None => {
1058 tracing::warn!("no block number from replayed updates, defaulting to 0");
1059 0
1060 }
1061 };
1062 {
1063 let mut market = market_data.write().await;
1064 market.update_gas_price(BlockGasPrice {
1065 block_number,
1066 block_hash: Default::default(),
1067 block_timestamp: 0,
1068 pricing: GasPrice::Legacy { gas_price },
1069 });
1070 }
1071
1072 let gas_token = native_token(&chain).map_err(|_| SolverBuildError::GasToken)?;
1074 let computation_config = ComputationManagerConfig::new()
1075 .with_gas_token(gas_token)
1076 .with_depth_slippage_threshold(DEFAULT_DEPTH_SLIPPAGE_THRESHOLD);
1077 let (computation_manager, _) =
1078 ComputationManager::new(computation_config, market_data.clone())
1079 .map_err(|e| SolverBuildError::ComputationManager(e.to_string()))?;
1080
1081 let derived_data: SharedDerivedDataRef = computation_manager.store();
1082 let derived_event_tx = computation_manager.event_sender();
1083
1084 let computation_event_rx = feed.subscribe();
1085 let (computation_shutdown_tx, computation_shutdown_rx) = broadcast::channel(1);
1086
1087 let computation_handle = tokio::spawn(async move {
1088 computation_manager
1089 .run(computation_event_rx, computation_shutdown_rx)
1090 .await;
1091 });
1092
1093 let mut solver_pool_handles: Vec<SolverPoolHandle> = Vec::new();
1095 let mut worker_pools: Vec<WorkerPool> = Vec::new();
1096 let mut max_timeout_ms = 0u64;
1097
1098 for (name, pool_cfg) in &pools {
1099 let algo_cfg = AlgorithmConfig::new(
1100 pool_cfg.min_hops(),
1101 pool_cfg.max_hops(),
1102 Duration::from_millis(pool_cfg.timeout_ms()),
1103 pool_cfg.max_routes(),
1104 )?;
1105
1106 let pool_event_rx = feed.subscribe();
1107 let derived_rx = derived_event_tx.subscribe();
1108
1109 let (worker_pool, task_handle) = WorkerPoolBuilder::new()
1110 .name(name.clone())
1111 .algorithm(pool_cfg.algorithm().to_string())
1112 .algorithm_config(algo_cfg)
1113 .num_workers(pool_cfg.num_workers())
1114 .task_queue_capacity(pool_cfg.task_queue_capacity())
1115 .build(market_data.clone(), Arc::clone(&derived_data), pool_event_rx, derived_rx)?;
1116
1117 solver_pool_handles.push(SolverPoolHandle::new(worker_pool.name(), task_handle));
1118 max_timeout_ms = max_timeout_ms.max(pool_cfg.timeout_ms());
1119 worker_pools.push(worker_pool);
1120 }
1121
1122 let encoder = {
1124 let registry = SwapEncoderRegistry::new(chain)
1125 .add_default_encoders(None)
1126 .map_err(|e| SolverBuildError::Encoder(e.to_string()))?;
1127 Encoder::new(chain, registry).map_err(|e| SolverBuildError::Encoder(e.to_string()))?
1128 };
1129
1130 let router_address = encoder.router_address().clone();
1131 let router_fees = encoder.router_fees();
1135 router_fees.set(crate::encoding::router_fees::RouterFees::new(
1136 100_000_000,
1137 0,
1138 0,
1139 std::collections::HashMap::new(),
1140 ));
1141 let router_config = WorkerPoolRouterConfig::default()
1142 .with_timeout(Duration::from_millis(max_timeout_ms.max(5000)))
1143 .with_min_responses(defaults::ROUTER_MIN_RESPONSES);
1144 let router = WorkerPoolRouter::new(solver_pool_handles, router_config, encoder);
1145
1146 let market_read = market_data.read().await;
1148 let added = market_read.component_topology();
1149 drop(market_read);
1150
1151 if market_event_tx
1152 .send(MarketEvent::MarketUpdated {
1153 added_components: added,
1154 removed_components: vec![],
1155 updated_components: vec![],
1156 })
1157 .is_err()
1158 {
1159 tracing::warn!("no receivers for initial MarketUpdated broadcast");
1160 }
1161
1162 let feed_handle = tokio::spawn(futures::future::pending::<()>());
1165 let gas_price_handle = tokio::spawn(async { });
1166 let router_fee_handle = tokio::spawn(async { });
1167
1168 Ok(Solver {
1169 router,
1170 worker_pools,
1171 market_data,
1172 derived_data,
1173 router_fees,
1174 feed_handle,
1175 gas_price_handle,
1176 router_fee_handle,
1177 computation_handle,
1178 computation_shutdown_tx,
1179 chain,
1180 router_address,
1181 market_event_tx,
1182 })
1183 }
1184
1185 pub fn shutdown(self) {
1187 let _ = self.computation_shutdown_tx.send(());
1188 for pool in self.worker_pools {
1189 pool.shutdown();
1190 }
1191 self.feed_handle.abort();
1192 self.gas_price_handle.abort();
1193 self.router_fee_handle.abort();
1194 }
1195
1196 pub fn into_parts(self) -> SolverParts {
1198 SolverParts {
1199 router: self.router,
1200 worker_pools: self.worker_pools,
1201 market_data: self.market_data,
1202 derived_data: self.derived_data,
1203 router_fees: self.router_fees,
1204 feed_handle: self.feed_handle,
1205 gas_price_handle: self.gas_price_handle,
1206 router_fee_handle: self.router_fee_handle,
1207 computation_handle: self.computation_handle,
1208 computation_shutdown_tx: self.computation_shutdown_tx,
1209 chain: self.chain,
1210 router_address: self.router_address,
1211 }
1212 }
1213}
1214
1215pub struct SolverParts {
1219 router: WorkerPoolRouter,
1221 worker_pools: Vec<WorkerPool>,
1223 market_data: MarketData,
1225 derived_data: SharedDerivedDataRef,
1227 router_fees: SharedRouterFees,
1229 feed_handle: JoinHandle<()>,
1231 gas_price_handle: JoinHandle<()>,
1233 router_fee_handle: JoinHandle<()>,
1235 computation_handle: JoinHandle<()>,
1237 computation_shutdown_tx: broadcast::Sender<()>,
1239 chain: Chain,
1241 router_address: Bytes,
1243}
1244
1245impl SolverParts {
1246 pub fn chain(&self) -> Chain {
1248 self.chain
1249 }
1250
1251 pub fn router_address(&self) -> &Bytes {
1253 &self.router_address
1254 }
1255
1256 pub fn worker_pools(&self) -> &[WorkerPool] {
1258 &self.worker_pools
1259 }
1260
1261 pub fn market_data(&self) -> &MarketData {
1263 &self.market_data
1264 }
1265
1266 pub fn derived_data(&self) -> &SharedDerivedDataRef {
1268 &self.derived_data
1269 }
1270
1271 pub fn router_fees(&self) -> &SharedRouterFees {
1273 &self.router_fees
1274 }
1275
1276 pub fn into_router(self) -> WorkerPoolRouter {
1278 self.router
1279 }
1280
1281 #[allow(clippy::type_complexity)]
1283 pub fn into_components(
1284 self,
1285 ) -> (
1286 WorkerPoolRouter,
1287 Vec<WorkerPool>,
1288 MarketData,
1289 SharedDerivedDataRef,
1290 JoinHandle<()>,
1291 JoinHandle<()>,
1292 JoinHandle<()>,
1293 JoinHandle<()>,
1294 broadcast::Sender<()>,
1295 ) {
1296 (
1297 self.router,
1298 self.worker_pools,
1299 self.market_data,
1300 self.derived_data,
1301 self.feed_handle,
1302 self.gas_price_handle,
1303 self.router_fee_handle,
1304 self.computation_handle,
1305 self.computation_shutdown_tx,
1306 )
1307 }
1308}