1use std::{collections::HashSet, str::FromStr, sync::Arc, time::Duration};
14
15use num_cpus;
16use serde::{Deserialize, Serialize};
17use tokio::{sync::broadcast, task::JoinHandle};
18use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
19use tycho_simulation::{
20 tycho_common::{models::Chain, Bytes},
21 tycho_core::models::Address,
22 tycho_ethereum::rpc::EthereumRpcClient,
23};
24
25use crate::{
26 algorithm::{AlgorithmConfig, AlgorithmError},
27 derived::{ComputationManager, ComputationManagerConfig, SharedDerivedDataRef},
28 encoding::encoder::Encoder,
29 feed::{
30 events::MarketEventHandler, gas::GasPriceFetcher, market_data::MarketData,
31 tycho_feed::TychoFeed, TychoFeedConfig,
32 },
33 graph::EdgeWeightUpdaterWithDerived,
34 price_guard::{
35 guard::PriceGuard, provider::PriceProvider, provider_registry::PriceProviderRegistry,
36 },
37 types::constants::native_token,
38 worker_pool::{
39 pool::{WorkerPool, WorkerPoolBuilder},
40 registry::UnknownAlgorithmError,
41 },
42 worker_pool_router::{config::WorkerPoolRouterConfig, SolverPoolHandle, WorkerPoolRouter},
43 Algorithm, Quote, QuoteRequest, SolveError,
44};
45
46pub mod defaults {
52 use std::time::Duration;
53
54 pub const MIN_TOKEN_QUALITY: i32 = 100;
56 pub const TRADED_N_DAYS_AGO: u64 = 3;
58 pub const TVL_BUFFER_RATIO: f64 = 1.1;
60 pub const GAS_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
62 pub const RECONNECT_DELAY: Duration = Duration::from_secs(5);
64 pub const ROUTER_MIN_RESPONSES: usize = 0;
67 pub const POOL_TASK_QUEUE_CAPACITY: usize = 1000;
69 pub const POOL_MIN_HOPS: usize = 1;
71 pub const POOL_MAX_HOPS: usize = 3;
73 pub const POOL_TIMEOUT_MS: u64 = 100;
75}
76
77const DEFAULT_TYCHO_USE_TLS: bool = true;
79const DEFAULT_DEPTH_SLIPPAGE_THRESHOLD: f64 = 0.01;
80const DEFAULT_ROUTER_TIMEOUT: Duration = Duration::from_secs(10);
83
84fn default_task_queue_capacity() -> usize {
87 defaults::POOL_TASK_QUEUE_CAPACITY
88}
89
90fn default_min_hops() -> usize {
91 defaults::POOL_MIN_HOPS
92}
93
94fn default_max_hops() -> usize {
95 defaults::POOL_MAX_HOPS
96}
97
98fn default_algo_timeout_ms() -> u64 {
99 defaults::POOL_TIMEOUT_MS
100}
101
102fn parse_connector_tokens(
103 raw: Option<&[String]>,
104) -> Result<Option<HashSet<Address>>, SolverBuildError> {
105 let Some(strings) = raw else {
106 return Ok(None);
107 };
108 let mut set = HashSet::with_capacity(strings.len());
109 for s in strings {
110 let addr = Address::from_str(s).map_err(|e| AlgorithmError::InvalidConfiguration {
111 reason: format!("connector_tokens: invalid address {s:?}: {e}"),
112 })?;
113 set.insert(addr);
114 }
115 Ok(Some(set))
116}
117
118#[must_use]
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct PoolConfig {
122 algorithm: String,
124 #[serde(default = "num_cpus::get")]
126 num_workers: usize,
127 #[serde(default = "default_task_queue_capacity")]
129 task_queue_capacity: usize,
130 #[serde(default = "default_min_hops")]
132 min_hops: usize,
133 #[serde(default = "default_max_hops")]
135 max_hops: usize,
136 #[serde(default = "default_algo_timeout_ms")]
138 timeout_ms: u64,
139 #[serde(default)]
141 max_routes: Option<usize>,
142 #[serde(default)]
145 connector_tokens: Option<Vec<String>>,
146}
147
148impl PoolConfig {
149 pub fn new(algorithm: impl Into<String>) -> Self {
151 Self {
152 algorithm: algorithm.into(),
153 num_workers: num_cpus::get(),
154 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
155 min_hops: defaults::POOL_MIN_HOPS,
156 max_hops: defaults::POOL_MAX_HOPS,
157 timeout_ms: defaults::POOL_TIMEOUT_MS,
158 max_routes: None,
159 connector_tokens: None,
160 }
161 }
162
163 pub fn algorithm(&self) -> &str {
165 &self.algorithm
166 }
167
168 pub fn num_workers(&self) -> usize {
170 self.num_workers
171 }
172
173 pub fn with_num_workers(mut self, num_workers: usize) -> Self {
175 self.num_workers = num_workers;
176 self
177 }
178
179 pub fn with_task_queue_capacity(mut self, task_queue_capacity: usize) -> Self {
181 self.task_queue_capacity = task_queue_capacity;
182 self
183 }
184
185 pub fn with_min_hops(mut self, min_hops: usize) -> Self {
187 self.min_hops = min_hops;
188 self
189 }
190
191 pub fn with_max_hops(mut self, max_hops: usize) -> Self {
193 self.max_hops = max_hops;
194 self
195 }
196
197 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
199 self.timeout_ms = timeout_ms;
200 self
201 }
202
203 pub fn with_max_routes(mut self, max_routes: Option<usize>) -> Self {
205 self.max_routes = max_routes;
206 self
207 }
208
209 pub fn task_queue_capacity(&self) -> usize {
211 self.task_queue_capacity
212 }
213
214 pub fn min_hops(&self) -> usize {
216 self.min_hops
217 }
218
219 pub fn max_hops(&self) -> usize {
221 self.max_hops
222 }
223
224 pub fn timeout_ms(&self) -> u64 {
226 self.timeout_ms
227 }
228
229 pub fn max_routes(&self) -> Option<usize> {
231 self.max_routes
232 }
233
234 pub fn with_connector_tokens(mut self, tokens: Vec<String>) -> Self {
237 self.connector_tokens = Some(tokens);
238 self
239 }
240
241 pub fn connector_tokens(&self) -> Option<&[String]> {
243 self.connector_tokens.as_deref()
244 }
245}
246
247#[derive(Debug, thiserror::Error)]
249#[error("timed out after {timeout_ms}ms waiting for market data and derived computations")]
250pub struct WaitReadyError {
251 timeout_ms: u64,
252}
253
254#[non_exhaustive]
256#[derive(Debug, thiserror::Error)]
257pub enum SolverBuildError {
258 #[error("failed to create ethereum RPC client: {0}")]
260 RpcClient(String),
261 #[error(transparent)]
263 AlgorithmConfig(#[from] AlgorithmError),
264 #[error("failed to create computation manager: {0}")]
266 ComputationManager(String),
267 #[error("failed to create encoder: {0}")]
269 Encoder(String),
270 #[error(transparent)]
272 UnknownAlgorithm(#[from] UnknownAlgorithmError),
273 #[error("gas token not configured for chain")]
275 GasToken,
276 #[error("no worker pools configured")]
278 NoPools,
279}
280
281enum PoolEntry {
283 BuiltIn {
284 name: String,
285 algorithm: String,
286 num_workers: usize,
287 task_queue_capacity: usize,
288 min_hops: usize,
289 max_hops: usize,
290 timeout_ms: u64,
291 max_routes: Option<usize>,
292 connector_tokens: Option<HashSet<Address>>,
293 },
294 Custom(CustomPoolEntry),
295}
296
297struct CustomPoolEntry {
299 name: String,
300 num_workers: usize,
301 task_queue_capacity: usize,
302 min_hops: usize,
303 max_hops: usize,
304 timeout_ms: u64,
305 max_routes: Option<usize>,
306 configure: Box<dyn FnOnce(WorkerPoolBuilder) -> WorkerPoolBuilder + Send>,
308}
309
310#[must_use = "a builder does nothing until .build() is called"]
312pub struct FyndBuilder {
315 chain: Chain,
316 tycho_url: String,
317 rpc_url: String,
318 protocols: Vec<String>,
319 min_tvl: f64,
320 tycho_api_key: Option<String>,
321 tycho_use_tls: bool,
322 min_token_quality: i32,
323 traded_n_days_ago: u64,
324 tvl_buffer_ratio: f64,
325 gas_refresh_interval: Duration,
326 reconnect_delay: Duration,
327 blocklisted_components: HashSet<String>,
328 router_timeout: Duration,
329 router_min_responses: usize,
330 encoder: Option<Encoder>,
331 pools: Vec<PoolEntry>,
332 price_guard_enabled: bool,
333 price_providers: Vec<Box<dyn PriceProvider>>,
334}
335
336impl FyndBuilder {
337 pub fn new(
339 chain: Chain,
340 tycho_url: impl Into<String>,
341 rpc_url: impl Into<String>,
342 protocols: Vec<String>,
343 min_tvl: f64,
344 ) -> Self {
345 Self {
346 chain,
347 tycho_url: tycho_url.into(),
348 rpc_url: rpc_url.into(),
349 protocols,
350 min_tvl,
351 tycho_api_key: None,
352 tycho_use_tls: DEFAULT_TYCHO_USE_TLS,
353 min_token_quality: defaults::MIN_TOKEN_QUALITY,
354 traded_n_days_ago: defaults::TRADED_N_DAYS_AGO,
355 tvl_buffer_ratio: defaults::TVL_BUFFER_RATIO,
356 gas_refresh_interval: defaults::GAS_REFRESH_INTERVAL,
357 reconnect_delay: defaults::RECONNECT_DELAY,
358 blocklisted_components: HashSet::new(),
359 router_timeout: DEFAULT_ROUTER_TIMEOUT,
360 router_min_responses: defaults::ROUTER_MIN_RESPONSES,
361 encoder: None,
362 pools: Vec::new(),
363 price_guard_enabled: false,
364 price_providers: Vec::new(),
365 }
366 }
367
368 pub fn chain(&self) -> Chain {
370 self.chain
371 }
372
373 pub fn tycho_api_key(mut self, key: impl Into<String>) -> Self {
375 self.tycho_api_key = Some(key.into());
376 self
377 }
378
379 pub fn min_tvl(mut self, min_tvl: f64) -> Self {
381 self.min_tvl = min_tvl;
382 self
383 }
384
385 pub fn tycho_use_tls(mut self, use_tls: bool) -> Self {
387 self.tycho_use_tls = use_tls;
388 self
389 }
390
391 pub fn min_token_quality(mut self, quality: i32) -> Self {
394 self.min_token_quality = quality;
395 self
396 }
397
398 pub fn traded_n_days_ago(mut self, days: u64) -> Self {
400 self.traded_n_days_ago = days;
401 self
402 }
403
404 pub fn tvl_buffer_ratio(mut self, ratio: f64) -> Self {
406 self.tvl_buffer_ratio = ratio;
407 self
408 }
409
410 pub fn gas_refresh_interval(mut self, interval: Duration) -> Self {
412 self.gas_refresh_interval = interval;
413 self
414 }
415
416 pub fn reconnect_delay(mut self, delay: Duration) -> Self {
418 self.reconnect_delay = delay;
419 self
420 }
421
422 pub fn blocklisted_components(mut self, components: HashSet<String>) -> Self {
424 self.blocklisted_components = components;
425 self
426 }
427
428 pub fn worker_router_timeout(mut self, timeout: Duration) -> Self {
430 self.router_timeout = timeout;
431 self
432 }
433
434 pub fn worker_router_min_responses(mut self, min: usize) -> Self {
436 self.router_min_responses = min;
437 self
438 }
439
440 pub fn encoder(mut self, encoder: Encoder) -> Self {
442 self.encoder = Some(encoder);
443 self
444 }
445
446 pub fn algorithm(mut self, algorithm: impl Into<String>) -> Self {
448 self.pools.push(PoolEntry::BuiltIn {
449 name: "default".to_string(),
450 algorithm: algorithm.into(),
451 num_workers: num_cpus::get(),
452 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
453 min_hops: defaults::POOL_MIN_HOPS,
454 max_hops: defaults::POOL_MAX_HOPS,
455 timeout_ms: defaults::POOL_TIMEOUT_MS,
456 max_routes: None,
457 connector_tokens: None,
458 });
459 self
460 }
461
462 pub fn with_algorithm<A, F>(mut self, name: impl Into<String>, factory: F) -> Self
466 where
467 A: Algorithm + 'static,
468 A::GraphManager: MarketEventHandler + EdgeWeightUpdaterWithDerived + 'static,
469 F: Fn(AlgorithmConfig) -> A + Clone + Send + Sync + 'static,
470 {
471 let name = name.into();
472 let algo_name = name.clone();
473 let configure =
474 Box::new(move |builder: WorkerPoolBuilder| builder.with_algorithm(algo_name, factory));
475 self.pools
476 .push(PoolEntry::Custom(CustomPoolEntry {
477 name,
478 num_workers: num_cpus::get(),
479 task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
480 min_hops: defaults::POOL_MIN_HOPS,
481 max_hops: defaults::POOL_MAX_HOPS,
482 timeout_ms: defaults::POOL_TIMEOUT_MS,
483 max_routes: None,
484 configure,
485 }));
486 self
487 }
488
489 pub fn add_default_price_providers(self) -> Self {
496 self.register_price_provider(Box::new(
497 crate::price_guard::hyperliquid::HyperliquidProvider::default(),
498 ))
499 .register_price_provider(Box::new(
500 crate::price_guard::binance_ws::BinanceWsProvider::default(),
501 ))
502 }
503
504 pub fn register_price_provider(mut self, provider: Box<dyn PriceProvider>) -> Self {
509 self.price_providers.push(provider);
510 self
511 }
512
513 pub fn price_guard_enabled(mut self, enabled: bool) -> Self {
520 self.price_guard_enabled = enabled;
521 self
522 }
523
524 pub fn add_pool(
531 mut self,
532 name: impl Into<String>,
533 config: &PoolConfig,
534 ) -> Result<Self, SolverBuildError> {
535 let connector_tokens = parse_connector_tokens(config.connector_tokens())?;
536 self.pools.push(PoolEntry::BuiltIn {
537 name: name.into(),
538 algorithm: config.algorithm().to_string(),
539 num_workers: config.num_workers(),
540 task_queue_capacity: config.task_queue_capacity(),
541 min_hops: config.min_hops(),
542 max_hops: config.max_hops(),
543 timeout_ms: config.timeout_ms(),
544 max_routes: config.max_routes(),
545 connector_tokens,
546 });
547 Ok(self)
548 }
549
550 pub fn build(mut self) -> Result<Solver, SolverBuildError> {
556 if self.pools.is_empty() {
557 return Err(SolverBuildError::NoPools);
558 }
559
560 if self.price_providers.is_empty() {
562 self = self.add_default_price_providers();
563 }
564
565 let market_data = MarketData::new_shared();
566
567 let tycho_feed_config = TychoFeedConfig::new(
568 self.tycho_url,
569 self.chain,
570 self.tycho_api_key,
571 self.tycho_use_tls,
572 self.protocols,
573 self.min_tvl,
574 )
575 .tvl_buffer_ratio(self.tvl_buffer_ratio)
576 .gas_refresh_interval(self.gas_refresh_interval)
577 .reconnect_delay(self.reconnect_delay)
578 .min_token_quality(self.min_token_quality)
579 .traded_n_days_ago(self.traded_n_days_ago)
580 .blocklisted_components(self.blocklisted_components);
581
582 let ethereum_client = EthereumRpcClient::new(self.rpc_url.as_str())
583 .map_err(|e| SolverBuildError::RpcClient(e.to_string()))?;
584
585 let (mut gas_price_fetcher, gas_price_worker_signal_tx) =
586 GasPriceFetcher::new(ethereum_client, market_data.clone());
587
588 let mut tycho_feed = TychoFeed::new(tycho_feed_config, market_data.clone());
589 tycho_feed = tycho_feed.with_gas_price_worker_signal_tx(gas_price_worker_signal_tx);
590
591 let gas_token = native_token(&self.chain).map_err(|_| SolverBuildError::GasToken)?;
592 let computation_config = ComputationManagerConfig::new()
593 .with_gas_token(gas_token)
594 .with_depth_slippage_threshold(DEFAULT_DEPTH_SLIPPAGE_THRESHOLD);
595 let (computation_manager, _) =
598 ComputationManager::new(computation_config, market_data.clone())
599 .map_err(|e| SolverBuildError::ComputationManager(e.to_string()))?;
600
601 let derived_data: SharedDerivedDataRef = computation_manager.store();
602 let derived_event_tx = computation_manager.event_sender();
603
604 let computation_event_rx = tycho_feed.subscribe();
606 let (computation_shutdown_tx, computation_shutdown_rx) = broadcast::channel(1);
607
608 let mut solver_pool_handles: Vec<SolverPoolHandle> = Vec::new();
609 let mut worker_pools: Vec<WorkerPool> = Vec::new();
610
611 for pool_entry in self.pools {
612 let pool_event_rx = tycho_feed.subscribe();
613 let derived_rx = derived_event_tx.subscribe();
614
615 let (worker_pool, task_handle) = match pool_entry {
616 PoolEntry::BuiltIn {
617 name,
618 algorithm,
619 num_workers,
620 task_queue_capacity,
621 min_hops,
622 max_hops,
623 timeout_ms,
624 max_routes,
625 connector_tokens,
626 } => {
627 let mut algo_cfg = AlgorithmConfig::new(
628 min_hops,
629 max_hops,
630 Duration::from_millis(timeout_ms),
631 max_routes,
632 )?;
633 if let Some(tokens) = connector_tokens {
634 algo_cfg = algo_cfg.with_connector_tokens(tokens);
635 }
636 WorkerPoolBuilder::new()
637 .name(name)
638 .algorithm(algorithm)
639 .algorithm_config(algo_cfg)
640 .num_workers(num_workers)
641 .task_queue_capacity(task_queue_capacity)
642 .build(
643 market_data.clone(),
644 Arc::clone(&derived_data),
645 pool_event_rx,
646 derived_rx,
647 )?
648 }
649 PoolEntry::Custom(custom) => {
650 let algo_cfg = AlgorithmConfig::new(
651 custom.min_hops,
652 custom.max_hops,
653 Duration::from_millis(custom.timeout_ms),
654 custom.max_routes,
655 )?;
656 let builder = WorkerPoolBuilder::new()
657 .name(custom.name)
658 .algorithm_config(algo_cfg)
659 .num_workers(custom.num_workers)
660 .task_queue_capacity(custom.task_queue_capacity);
661 let builder = (custom.configure)(builder);
662 builder.build(
663 market_data.clone(),
664 Arc::clone(&derived_data),
665 pool_event_rx,
666 derived_rx,
667 )?
668 }
669 };
670
671 solver_pool_handles.push(SolverPoolHandle::new(worker_pool.name(), task_handle));
672 worker_pools.push(worker_pool);
673 }
674
675 let encoder = match self.encoder {
676 Some(enc) => enc,
677 None => {
678 let registry = SwapEncoderRegistry::new(self.chain)
679 .add_default_encoders(None)
680 .map_err(|e| SolverBuildError::Encoder(e.to_string()))?;
681 Encoder::new(self.chain, registry)
682 .map_err(|e| SolverBuildError::Encoder(e.to_string()))?
683 }
684 };
685
686 let chain = self.chain;
687 let router_address = encoder.router_address().clone();
688
689 let router_config = WorkerPoolRouterConfig::default()
692 .with_timeout(self.router_timeout)
693 .with_min_responses(self.router_min_responses);
694 let mut router = WorkerPoolRouter::new(solver_pool_handles, router_config, encoder);
695
696 if self.price_guard_enabled {
697 let mut registry = PriceProviderRegistry::new();
698 let mut worker_handles = Vec::new();
699 for mut provider in self.price_providers {
700 worker_handles.push(provider.start(market_data.clone()));
701 registry = registry.register(provider);
702 }
703 let price_guard = PriceGuard::new(registry, worker_handles);
704 router = router.with_price_guard(price_guard);
705 }
706
707 let feed_handle = tokio::spawn(async move {
708 if let Err(e) = tycho_feed.run().await {
709 tracing::error!(error = %e, "tycho feed error");
710 }
711 });
712
713 let gas_price_handle = tokio::spawn(async move {
714 if let Err(e) = gas_price_fetcher.run().await {
715 tracing::error!(error = %e, "gas price fetcher error");
716 }
717 });
718
719 let computation_handle = tokio::spawn(async move {
720 computation_manager
721 .run(computation_event_rx, computation_shutdown_rx)
722 .await;
723 });
724
725 Ok(Solver {
726 router,
727 worker_pools,
728 market_data,
729 derived_data,
730 feed_handle,
731 gas_price_handle,
732 computation_handle,
733 computation_shutdown_tx,
734 chain,
735 router_address,
736 })
737 }
738}
739
740pub struct Solver {
742 router: WorkerPoolRouter,
743 worker_pools: Vec<WorkerPool>,
744 market_data: MarketData,
745 derived_data: SharedDerivedDataRef,
746 feed_handle: JoinHandle<()>,
747 gas_price_handle: JoinHandle<()>,
748 computation_handle: JoinHandle<()>,
749 computation_shutdown_tx: broadcast::Sender<()>,
750 chain: Chain,
751 router_address: Bytes,
752}
753
754impl Solver {
755 pub fn market_data(&self) -> MarketData {
757 self.market_data.clone()
758 }
759
760 pub fn derived_data(&self) -> SharedDerivedDataRef {
762 Arc::clone(&self.derived_data)
763 }
764
765 pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
771 self.router.quote(request).await
772 }
773
774 pub async fn wait_until_ready(&self, timeout: Duration) -> Result<(), WaitReadyError> {
790 const POLL_INTERVAL: Duration = Duration::from_millis(500);
791
792 let deadline = tokio::time::Instant::now() + timeout;
793
794 loop {
795 let market_ready = self
796 .market_data
797 .read()
798 .await
799 .last_updated()
800 .is_some();
801 let derived_ready = self
802 .derived_data
803 .read()
804 .await
805 .derived_data_ready();
806
807 if market_ready && derived_ready {
808 return Ok(());
809 }
810
811 if tokio::time::Instant::now() >= deadline {
812 return Err(WaitReadyError { timeout_ms: timeout.as_millis() as u64 });
813 }
814
815 tokio::time::sleep(POLL_INTERVAL).await;
816 }
817 }
818
819 pub fn shutdown(self) {
821 let _ = self.computation_shutdown_tx.send(());
822 for pool in self.worker_pools {
823 pool.shutdown();
824 }
825 self.feed_handle.abort();
826 self.gas_price_handle.abort();
827 }
828
829 pub fn into_parts(self) -> SolverParts {
831 SolverParts {
832 router: self.router,
833 worker_pools: self.worker_pools,
834 market_data: self.market_data,
835 derived_data: self.derived_data,
836 feed_handle: self.feed_handle,
837 gas_price_handle: self.gas_price_handle,
838 computation_handle: self.computation_handle,
839 computation_shutdown_tx: self.computation_shutdown_tx,
840 chain: self.chain,
841 router_address: self.router_address,
842 }
843 }
844}
845
846pub struct SolverParts {
850 router: WorkerPoolRouter,
852 worker_pools: Vec<WorkerPool>,
854 market_data: MarketData,
856 derived_data: SharedDerivedDataRef,
858 feed_handle: JoinHandle<()>,
860 gas_price_handle: JoinHandle<()>,
862 computation_handle: JoinHandle<()>,
864 computation_shutdown_tx: broadcast::Sender<()>,
866 chain: Chain,
868 router_address: Bytes,
870}
871
872impl SolverParts {
873 pub fn chain(&self) -> Chain {
875 self.chain
876 }
877
878 pub fn router_address(&self) -> &Bytes {
880 &self.router_address
881 }
882
883 pub fn worker_pools(&self) -> &[WorkerPool] {
885 &self.worker_pools
886 }
887
888 pub fn market_data(&self) -> &MarketData {
890 &self.market_data
891 }
892
893 pub fn derived_data(&self) -> &SharedDerivedDataRef {
895 &self.derived_data
896 }
897
898 pub fn into_router(self) -> WorkerPoolRouter {
900 self.router
901 }
902
903 #[allow(clippy::type_complexity)]
905 pub fn into_components(
906 self,
907 ) -> (
908 WorkerPoolRouter,
909 Vec<WorkerPool>,
910 MarketData,
911 SharedDerivedDataRef,
912 JoinHandle<()>,
913 JoinHandle<()>,
914 JoinHandle<()>,
915 broadcast::Sender<()>,
916 ) {
917 (
918 self.router,
919 self.worker_pools,
920 self.market_data,
921 self.derived_data,
922 self.feed_handle,
923 self.gas_price_handle,
924 self.computation_handle,
925 self.computation_shutdown_tx,
926 )
927 }
928}