Skip to main content

fynd_core/
solver.rs

1//! High-level solver setup via [`FyndBuilder`].
2//!
3//! [`FyndBuilder`] assembles the full Tycho feed + gas fetcher + computation
4//! manager + one or more worker pools + encoder + router pipeline with sensible
5//! defaults. For simple cases a single call chain is all that's needed:
6//!
7//! ```ignore
8//! let solver = FyndBuilder::new(chain, tycho_url, rpc_url, protocols, min_tvl)
9//!     .tycho_api_key(key)
10//!     .algorithm("most_liquid")
11//!     .build()?;
12//! ```
13use std::{collections::HashSet, str::FromStr, sync::Arc, time::Duration};
14
15use num_cpus;
16use serde::{Deserialize, Serialize};
17use tokio::{sync::broadcast, task::JoinHandle};
18use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
19#[cfg(feature = "test-utils")]
20use tycho_simulation::tycho_ethereum::gas::{BlockGasPrice, GasPrice};
21use tycho_simulation::{
22    evm::pending::PendingBlockProcessor,
23    tycho_common::{models::Chain, traits::TxDeltaIndexer, Bytes},
24    tycho_core::models::Address,
25    tycho_ethereum::rpc::EthereumRpcClient,
26};
27
28use crate::{
29    algorithm::{AlgorithmConfig, AlgorithmError},
30    derived::{ComputationManager, ComputationManagerConfig, SharedDerivedDataRef},
31    encoding::{encoder::Encoder, fee_fetcher::RouterFeeFetcher, router_fees::SharedRouterFees},
32    feed::{
33        events::{MarketEvent, MarketEventHandler},
34        gas::GasPriceFetcher,
35        market_data::MarketData,
36        tycho_feed::TychoFeed,
37        TychoFeedConfig,
38    },
39    graph::EdgeWeightUpdaterWithDerived,
40    price_guard::{
41        guard::PriceGuard, provider::PriceProvider, provider_registry::PriceProviderRegistry,
42    },
43    types::constants::native_token,
44    worker_pool::{
45        pool::{WorkerPool, WorkerPoolBuilder},
46        registry::UnknownAlgorithmError,
47    },
48    worker_pool_router::{config::WorkerPoolRouterConfig, SolverPoolHandle, WorkerPoolRouter},
49    Algorithm, Quote, QuoteRequest, SolveError,
50};
51
52/// Default values for [`FyndBuilder`] configuration and [`PoolConfig`] deserialization.
53///
54/// These are the single source of truth for all tunable defaults. Downstream
55/// crates (e.g. `fynd-rpc`) should re-export or reference these rather than
56/// redeclaring their own copies.
57pub mod defaults {
58    use std::time::Duration;
59
60    /// Minimum token quality score required for a token to be included in routing.
61    pub const MIN_TOKEN_QUALITY: i32 = 100;
62    /// Maximum age (in days) of trading history required for a token to be considered liquid.
63    pub const TRADED_N_DAYS_AGO: u64 = 3;
64    /// Multiplier applied to a pool's TVL when estimating available liquidity.
65    pub const TVL_BUFFER_RATIO: f64 = 1.1;
66    /// How often the gas price is refreshed from the RPC node.
67    pub const GAS_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
68    /// How often router fees are refreshed from the on-chain FeeCalculator contract.
69    pub const ROUTER_FEE_REFRESH_INTERVAL: Duration = Duration::from_secs(300);
70    /// Delay before reconnecting to the Tycho feed after a disconnect.
71    pub const RECONNECT_DELAY: Duration = Duration::from_secs(5);
72    /// Minimum number of solver pool responses required before returning a quote (`0` = wait for
73    /// all).
74    pub const ROUTER_MIN_RESPONSES: usize = 0;
75    /// Capacity of the task queue for each worker pool.
76    pub const POOL_TASK_QUEUE_CAPACITY: usize = 1000;
77    /// Minimum number of hops allowed in a route.
78    pub const POOL_MIN_HOPS: usize = 1;
79    /// Maximum number of hops allowed in a route.
80    pub const POOL_MAX_HOPS: usize = 3;
81    /// Per-pool solve timeout in milliseconds.
82    pub const POOL_TIMEOUT_MS: u64 = 100;
83}
84
85// Internal-only defaults not shared with downstream crates.
86const DEFAULT_TYCHO_USE_TLS: bool = true;
87const DEFAULT_DEPTH_SLIPPAGE_THRESHOLD: f64 = 0.01;
88/// Generous router timeout for standalone (non-server) use. HTTP services should
89/// override this to a tighter value appropriate for their SLA.
90const DEFAULT_ROUTER_TIMEOUT: Duration = Duration::from_secs(10);
91
92// serde requires free functions for `#[serde(default = "...")]` — these delegate to the
93// defaults module so both deserialization and the builder stay in sync.
94fn default_task_queue_capacity() -> usize {
95    defaults::POOL_TASK_QUEUE_CAPACITY
96}
97
98fn default_min_hops() -> usize {
99    defaults::POOL_MIN_HOPS
100}
101
102fn default_max_hops() -> usize {
103    defaults::POOL_MAX_HOPS
104}
105
106fn default_algo_timeout_ms() -> u64 {
107    defaults::POOL_TIMEOUT_MS
108}
109
110fn parse_connector_tokens(
111    raw: Option<&[String]>,
112) -> Result<Option<HashSet<Address>>, SolverBuildError> {
113    let Some(strings) = raw else {
114        return Ok(None);
115    };
116    let mut set = HashSet::with_capacity(strings.len());
117    for s in strings {
118        let addr = Address::from_str(s).map_err(|e| AlgorithmError::InvalidConfiguration {
119            reason: format!("connector_tokens: invalid address {s:?}: {e}"),
120        })?;
121        set.insert(addr);
122    }
123    Ok(Some(set))
124}
125
126/// Per-pool configuration for [`FyndBuilder::add_pool`].
127#[must_use]
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct PoolConfig {
130    /// Algorithm name for this pool (e.g., `"most_liquid"`).
131    algorithm: String,
132    /// Number of worker threads for this pool.
133    #[serde(default = "num_cpus::get")]
134    num_workers: usize,
135    /// Task queue capacity for this pool.
136    #[serde(default = "default_task_queue_capacity")]
137    task_queue_capacity: usize,
138    /// Minimum hops to search (must be >= 1).
139    #[serde(default = "default_min_hops")]
140    min_hops: usize,
141    /// Maximum hops to search.
142    #[serde(default = "default_max_hops")]
143    max_hops: usize,
144    /// Timeout for solving in milliseconds.
145    #[serde(default = "default_algo_timeout_ms")]
146    timeout_ms: u64,
147    /// Maximum number of paths to simulate per solve. `None` simulates all scored paths.
148    #[serde(default)]
149    max_routes: Option<usize>,
150    /// Lowercase hex addresses (e.g. `"0xc02aaa…"`) allowed as intermediate routing hops.
151    /// Absent = no restriction. Typically 3–10 entries (e.g. WETH, USDC, USDT, DAI).
152    #[serde(default)]
153    connector_tokens: Option<Vec<String>>,
154}
155
156impl PoolConfig {
157    /// Creates a new pool config with the given algorithm name and defaults for all other fields.
158    pub fn new(algorithm: impl Into<String>) -> Self {
159        Self {
160            algorithm: algorithm.into(),
161            num_workers: num_cpus::get(),
162            task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
163            min_hops: defaults::POOL_MIN_HOPS,
164            max_hops: defaults::POOL_MAX_HOPS,
165            timeout_ms: defaults::POOL_TIMEOUT_MS,
166            max_routes: None,
167            connector_tokens: None,
168        }
169    }
170
171    /// Returns the algorithm name.
172    pub fn algorithm(&self) -> &str {
173        &self.algorithm
174    }
175
176    /// Returns the number of worker threads.
177    pub fn num_workers(&self) -> usize {
178        self.num_workers
179    }
180
181    /// Sets the number of worker threads.
182    pub fn with_num_workers(mut self, num_workers: usize) -> Self {
183        self.num_workers = num_workers;
184        self
185    }
186
187    /// Sets the task queue capacity.
188    pub fn with_task_queue_capacity(mut self, task_queue_capacity: usize) -> Self {
189        self.task_queue_capacity = task_queue_capacity;
190        self
191    }
192
193    /// Sets the minimum hops.
194    pub fn with_min_hops(mut self, min_hops: usize) -> Self {
195        self.min_hops = min_hops;
196        self
197    }
198
199    /// Sets the maximum hops.
200    pub fn with_max_hops(mut self, max_hops: usize) -> Self {
201        self.max_hops = max_hops;
202        self
203    }
204
205    /// Sets the timeout in milliseconds.
206    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
207        self.timeout_ms = timeout_ms;
208        self
209    }
210
211    /// Sets the maximum number of routes to simulate.
212    pub fn with_max_routes(mut self, max_routes: Option<usize>) -> Self {
213        self.max_routes = max_routes;
214        self
215    }
216
217    /// Returns the task queue capacity.
218    pub fn task_queue_capacity(&self) -> usize {
219        self.task_queue_capacity
220    }
221
222    /// Returns the minimum hops.
223    pub fn min_hops(&self) -> usize {
224        self.min_hops
225    }
226
227    /// Returns the maximum hops.
228    pub fn max_hops(&self) -> usize {
229        self.max_hops
230    }
231
232    /// Returns the timeout in milliseconds.
233    pub fn timeout_ms(&self) -> u64 {
234        self.timeout_ms
235    }
236
237    /// Returns the maximum number of routes to simulate.
238    pub fn max_routes(&self) -> Option<usize> {
239        self.max_routes
240    }
241
242    /// Restricts intermediate hops to the given token addresses (hex strings with or without `0x`
243    /// prefix). Absent = no restriction.
244    pub fn with_connector_tokens(mut self, tokens: Vec<String>) -> Self {
245        self.connector_tokens = Some(tokens);
246        self
247    }
248
249    /// Returns the raw connector token address strings, or `None` if unrestricted.
250    pub fn connector_tokens(&self) -> Option<&[String]> {
251        self.connector_tokens.as_deref()
252    }
253}
254
255/// Error returned by [`Solver::wait_until_ready`].
256#[derive(Debug, thiserror::Error)]
257#[error("timed out after {timeout_ms}ms waiting for market data and derived computations")]
258pub struct WaitReadyError {
259    timeout_ms: u64,
260}
261
262/// Error returned by [`FyndBuilder::build`] and [`FyndBuilder::build_with_pending`].
263#[non_exhaustive]
264#[derive(Debug, thiserror::Error)]
265pub enum SolverBuildError {
266    /// The Ethereum RPC client could not be created (e.g. malformed URL).
267    #[error("failed to create ethereum RPC client: {0}")]
268    RpcClient(String),
269    /// An invalid algorithm configuration was supplied.
270    #[error(transparent)]
271    AlgorithmConfig(#[from] AlgorithmError),
272    /// The [`ComputationManager`] failed to initialise.
273    #[error("failed to create computation manager: {0}")]
274    ComputationManager(String),
275    /// The swap encoder could not be created for the target chain.
276    #[error("failed to create encoder: {0}")]
277    Encoder(String),
278    /// The router fee fetcher could not be created (e.g. malformed RPC URL).
279    #[error("failed to create router fee fetcher: {0}")]
280    RouterFeeFetcher(String),
281    /// A pool referenced an algorithm name that is not registered.
282    #[error(transparent)]
283    UnknownAlgorithm(#[from] UnknownAlgorithmError),
284    /// No native gas token is defined for the requested chain.
285    #[error("gas token not configured for chain")]
286    GasToken,
287    /// [`FyndBuilder::build`] was called without configuring any worker pools.
288    #[error("no worker pools configured")]
289    NoPools,
290    /// A recorded update failed to replay through the feed.
291    #[cfg(feature = "test-utils")]
292    #[error("replay failed: {0}")]
293    Replay(String),
294    /// The feed task failed before delivering the [`PendingBlockProcessor`].
295    ///
296    /// The inner string is the `DataFeedError` message from `TychoFeed::run_with_pending`
297    /// (e.g. "failed to load tokens: connection refused").
298    #[error("feed setup failed before delivering pending processor: {0}")]
299    FeedSetup(String),
300    /// The pending-processor oneshot closed without delivering a value, meaning the feed task
301    /// panicked rather than returning an error through the channel.
302    #[error("pending processor channel closed before processor was delivered")]
303    PendingChannelClosed,
304}
305
306/// Internal pool entry — either a built-in algorithm (by name) or a custom one.
307enum PoolEntry {
308    BuiltIn {
309        name: String,
310        algorithm: String,
311        num_workers: usize,
312        task_queue_capacity: usize,
313        min_hops: usize,
314        max_hops: usize,
315        timeout_ms: u64,
316        max_routes: Option<usize>,
317        connector_tokens: Option<HashSet<Address>>,
318    },
319    Custom(CustomPoolEntry),
320}
321
322/// Pool entry backed by a custom [`Algorithm`] implementation.
323struct CustomPoolEntry {
324    name: String,
325    num_workers: usize,
326    task_queue_capacity: usize,
327    min_hops: usize,
328    max_hops: usize,
329    timeout_ms: u64,
330    max_routes: Option<usize>,
331    /// Applies the custom algorithm to a `WorkerPoolBuilder`.
332    configure: Box<dyn FnOnce(WorkerPoolBuilder) -> WorkerPoolBuilder + Send>,
333}
334
335/// All components produced by [`FyndBuilder::assemble_components`], consumed by
336/// [`FyndBuilder::build`] and [`FyndBuilder::build_with_pending`].
337struct BuiltComponents {
338    tycho_feed: TychoFeed,
339    gas_price_fetcher: GasPriceFetcher<EthereumRpcClient>,
340    router_fee_fetcher: RouterFeeFetcher,
341    computation_manager: ComputationManager,
342    computation_event_rx: broadcast::Receiver<MarketEvent>,
343    computation_shutdown_tx: broadcast::Sender<()>,
344    computation_shutdown_rx: broadcast::Receiver<()>,
345    router: WorkerPoolRouter,
346    worker_pools: Vec<WorkerPool>,
347    market_data: MarketData,
348    derived_data: SharedDerivedDataRef,
349    router_fees: SharedRouterFees,
350    chain: Chain,
351    router_address: Bytes,
352    pending_indexers: Vec<(String, Box<dyn TxDeltaIndexer>)>,
353    market_event_tx: broadcast::Sender<MarketEvent>,
354}
355
356/// Builder for assembling the full solver pipeline.
357///
358/// Configures the Tycho market-data feed, gas price fetcher, derived-data
359/// computation manager, one or more worker pools, encoder, and router.
360#[must_use = "a builder does nothing until .build() is called"]
361pub struct FyndBuilder {
362    chain: Chain,
363    tycho_url: String,
364    rpc_url: String,
365    protocols: Vec<String>,
366    min_tvl: f64,
367    tycho_api_key: Option<String>,
368    tycho_use_tls: bool,
369    min_token_quality: i32,
370    traded_n_days_ago: u64,
371    tvl_buffer_ratio: f64,
372    gas_refresh_interval: Duration,
373    reconnect_delay: Duration,
374    blocklisted_components: HashSet<String>,
375    partial_blocks: bool,
376    router_timeout: Duration,
377    router_min_responses: usize,
378    encoder: Option<Encoder>,
379    pools: Vec<PoolEntry>,
380    price_guard_enabled: bool,
381    price_providers: Vec<Box<dyn PriceProvider>>,
382    pending_indexers: Vec<(String, Box<dyn TxDeltaIndexer>)>,
383}
384
385impl FyndBuilder {
386    /// Creates a new builder with the required parameters.
387    pub fn new(
388        chain: Chain,
389        tycho_url: impl Into<String>,
390        rpc_url: impl Into<String>,
391        protocols: Vec<String>,
392        min_tvl: f64,
393    ) -> Self {
394        Self {
395            chain,
396            tycho_url: tycho_url.into(),
397            rpc_url: rpc_url.into(),
398            protocols,
399            min_tvl,
400            tycho_api_key: None,
401            tycho_use_tls: DEFAULT_TYCHO_USE_TLS,
402            min_token_quality: defaults::MIN_TOKEN_QUALITY,
403            traded_n_days_ago: defaults::TRADED_N_DAYS_AGO,
404            tvl_buffer_ratio: defaults::TVL_BUFFER_RATIO,
405            gas_refresh_interval: defaults::GAS_REFRESH_INTERVAL,
406            reconnect_delay: defaults::RECONNECT_DELAY,
407            blocklisted_components: HashSet::new(),
408            partial_blocks: false,
409            router_timeout: DEFAULT_ROUTER_TIMEOUT,
410            router_min_responses: defaults::ROUTER_MIN_RESPONSES,
411            encoder: None,
412            pools: Vec::new(),
413            price_guard_enabled: false,
414            price_providers: Vec::new(),
415            pending_indexers: Vec::new(),
416        }
417    }
418
419    /// The blockchain this builder is configured for.
420    pub fn chain(&self) -> Chain {
421        self.chain
422    }
423
424    /// Sets the Tycho API key.
425    pub fn tycho_api_key(mut self, key: impl Into<String>) -> Self {
426        self.tycho_api_key = Some(key.into());
427        self
428    }
429
430    /// Overrides the minimum TVL filter set in [`FyndBuilder::new`].
431    pub fn min_tvl(mut self, min_tvl: f64) -> Self {
432        self.min_tvl = min_tvl;
433        self
434    }
435
436    /// Enables or disables TLS for the Tycho WebSocket connection (default: `true`).
437    pub fn tycho_use_tls(mut self, use_tls: bool) -> Self {
438        self.tycho_use_tls = use_tls;
439        self
440    }
441
442    /// Sets the minimum token quality score; tokens below this threshold are excluded (default:
443    /// 100).
444    pub fn min_token_quality(mut self, quality: i32) -> Self {
445        self.min_token_quality = quality;
446        self
447    }
448
449    /// Filters out pools whose last trade is older than `days` days (default: 3).
450    pub fn traded_n_days_ago(mut self, days: u64) -> Self {
451        self.traded_n_days_ago = days;
452        self
453    }
454
455    /// Multiplies reported TVL by `ratio` before applying the `min_tvl` filter (default: 1.1).
456    pub fn tvl_buffer_ratio(mut self, ratio: f64) -> Self {
457        self.tvl_buffer_ratio = ratio;
458        self
459    }
460
461    /// Sets how often the gas price is refreshed from the RPC node (default: 30 s).
462    pub fn gas_refresh_interval(mut self, interval: Duration) -> Self {
463        self.gas_refresh_interval = interval;
464        self
465    }
466
467    /// Sets the delay before reconnecting to Tycho after a disconnection (default: 5 s).
468    pub fn reconnect_delay(mut self, delay: Duration) -> Self {
469        self.reconnect_delay = delay;
470        self
471    }
472
473    /// Sets component IDs to exclude from the Tycho stream.
474    pub fn blocklisted_components(mut self, components: HashSet<String>) -> Self {
475        self.blocklisted_components = components;
476        self
477    }
478
479    /// Enables partial block (flashblock) updates from the Tycho stream (default: `false`).
480    ///
481    /// When enabled, the stream delivers pool state updates mid-block rather than only at
482    /// finalization, reducing latency. Only supported for on-chain protocols; RFQ streams are
483    /// unaffected.
484    pub fn partial_blocks(mut self, enabled: bool) -> Self {
485        self.partial_blocks = enabled;
486        self
487    }
488
489    /// Sets the worker router timeout (default: 10s).
490    pub fn worker_router_timeout(mut self, timeout: Duration) -> Self {
491        self.router_timeout = timeout;
492        self
493    }
494
495    /// Sets the minimum number of solver responses before early return (default: 0).
496    pub fn worker_router_min_responses(mut self, min: usize) -> Self {
497        self.router_min_responses = min;
498        self
499    }
500
501    /// Overrides the default encoder.
502    pub fn encoder(mut self, encoder: Encoder) -> Self {
503        self.encoder = Some(encoder);
504        self
505    }
506
507    /// Shorthand: adds a single pool named `"default"` using a built-in algorithm by name.
508    pub fn algorithm(mut self, algorithm: impl Into<String>) -> Self {
509        self.pools.push(PoolEntry::BuiltIn {
510            name: "default".to_string(),
511            algorithm: algorithm.into(),
512            num_workers: num_cpus::get(),
513            task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
514            min_hops: defaults::POOL_MIN_HOPS,
515            max_hops: defaults::POOL_MAX_HOPS,
516            timeout_ms: defaults::POOL_TIMEOUT_MS,
517            max_routes: None,
518            connector_tokens: None,
519        });
520        self
521    }
522
523    /// Shorthand: adds a single pool with a custom [`Algorithm`] implementation.
524    ///
525    /// The `factory` closure is called once per worker thread.
526    pub fn with_algorithm<A, F>(mut self, name: impl Into<String>, factory: F) -> Self
527    where
528        A: Algorithm + 'static,
529        A::GraphManager: MarketEventHandler + EdgeWeightUpdaterWithDerived + 'static,
530        F: Fn(AlgorithmConfig) -> A + Clone + Send + Sync + 'static,
531    {
532        let name = name.into();
533        let algo_name = name.clone();
534        let configure =
535            Box::new(move |builder: WorkerPoolBuilder| builder.with_algorithm(algo_name, factory));
536        self.pools
537            .push(PoolEntry::Custom(CustomPoolEntry {
538                name,
539                num_workers: num_cpus::get(),
540                task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
541                min_hops: defaults::POOL_MIN_HOPS,
542                max_hops: defaults::POOL_MAX_HOPS,
543                timeout_ms: defaults::POOL_TIMEOUT_MS,
544                max_routes: None,
545                configure,
546            }));
547        self
548    }
549
550    /// Registers the built-in price providers (Hyperliquid + Binance).
551    ///
552    /// Called automatically during [`build`](Self::build) if no providers have been
553    /// registered and the price guard is not disabled. To use only custom
554    /// providers, call [`register_price_provider`](Self::register_price_provider)
555    /// before `build()` and the defaults will be skipped.
556    pub fn add_default_price_providers(self) -> Self {
557        self.register_price_provider(Box::new(
558            crate::price_guard::hyperliquid::HyperliquidProvider::default(),
559        ))
560        .register_price_provider(Box::new(
561            crate::price_guard::binance_ws::BinanceWsProvider::default(),
562        ))
563    }
564
565    /// Registers a custom price provider for the price guard.
566    ///
567    /// The provider's [`start`](PriceProvider::start) method is called during
568    /// [`build`](Self::build) with the shared market data.
569    pub fn register_price_provider(mut self, provider: Box<dyn PriceProvider>) -> Self {
570        self.price_providers.push(provider);
571        self
572    }
573
574    /// Registers a [`TxDeltaIndexer`] for ephemeral pending-block simulation.
575    ///
576    /// `extractor` is the protocol synchronizer name (e.g. `"uniswap_v3"`). Only has effect
577    /// when calling [`build_with_pending`](Self::build_with_pending). VM protocols (prefix
578    /// `"vm:"`) are rejected by the underlying stream builder at build time.
579    pub fn with_pending_indexer(
580        mut self,
581        extractor: impl Into<String>,
582        indexer: Box<dyn TxDeltaIndexer>,
583    ) -> Self {
584        self.pending_indexers
585            .push((extractor.into(), indexer));
586        self
587    }
588
589    /// Enables or disables the price guard.
590    ///
591    /// When enabled, providers are started and caches stay warm. Validation
592    /// only runs for requests where the client sets `enabled: true` in
593    /// `PriceGuardConfig`. When disabled, no providers are started and
594    /// per-request attempts to use the guard return an error.
595    pub fn price_guard_enabled(mut self, enabled: bool) -> Self {
596        self.price_guard_enabled = enabled;
597        self
598    }
599
600    /// Adds a named pool using the given [`PoolConfig`].
601    ///
602    /// # Errors
603    ///
604    /// Returns [`SolverBuildError::AlgorithmConfig`] if any address in `connector_tokens` is not
605    /// valid hex.
606    pub fn add_pool(
607        mut self,
608        name: impl Into<String>,
609        config: &PoolConfig,
610    ) -> Result<Self, SolverBuildError> {
611        let connector_tokens = parse_connector_tokens(config.connector_tokens())?;
612        self.pools.push(PoolEntry::BuiltIn {
613            name: name.into(),
614            algorithm: config.algorithm().to_string(),
615            num_workers: config.num_workers(),
616            task_queue_capacity: config.task_queue_capacity(),
617            min_hops: config.min_hops(),
618            max_hops: config.max_hops(),
619            timeout_ms: config.timeout_ms(),
620            max_routes: config.max_routes(),
621            connector_tokens,
622        });
623        Ok(self)
624    }
625
626    /// Constructs all components shared between [`build`](Self::build) and
627    /// [`build_with_pending`](Self::build_with_pending).
628    fn assemble_components(mut self) -> Result<BuiltComponents, SolverBuildError> {
629        if self.pools.is_empty() {
630            return Err(SolverBuildError::NoPools);
631        }
632
633        // Add built-in providers if none were explicitly registered.
634        if self.price_providers.is_empty() {
635            self = self.add_default_price_providers();
636        }
637
638        let market_data = MarketData::new_shared();
639
640        let tycho_feed_config = TychoFeedConfig::new(
641            self.tycho_url,
642            self.chain,
643            self.tycho_api_key,
644            self.tycho_use_tls,
645            self.protocols,
646            self.min_tvl,
647        )
648        .tvl_buffer_ratio(self.tvl_buffer_ratio)
649        .reconnect_delay(self.reconnect_delay)
650        .min_token_quality(self.min_token_quality)
651        .traded_n_days_ago(self.traded_n_days_ago)
652        .blocklisted_components(self.blocklisted_components)
653        .partial_blocks(self.partial_blocks);
654
655        let ethereum_client = EthereumRpcClient::new(self.rpc_url.as_str())
656            .map_err(|e| SolverBuildError::RpcClient(e.to_string()))?;
657
658        let gas_price_fetcher =
659            GasPriceFetcher::new(ethereum_client, market_data.clone(), self.gas_refresh_interval);
660
661        let tycho_feed = TychoFeed::new(tycho_feed_config, market_data.clone());
662        let market_event_tx = tycho_feed.event_sender();
663
664        let gas_token = native_token(&self.chain).map_err(|_| SolverBuildError::GasToken)?;
665        let computation_config = ComputationManagerConfig::new()
666            .with_gas_token(gas_token)
667            .with_depth_slippage_threshold(DEFAULT_DEPTH_SLIPPAGE_THRESHOLD);
668        // ComputationManager::new returns a broadcast receiver that we don't need here —
669        // workers subscribe via computation_manager.event_sender() below.
670        let (computation_manager, _) =
671            ComputationManager::new(computation_config, market_data.clone())
672                .map_err(|e| SolverBuildError::ComputationManager(e.to_string()))?;
673
674        let derived_data: SharedDerivedDataRef = computation_manager.store();
675        let derived_event_tx = computation_manager.event_sender();
676
677        // Subscribe event channels before spawning (one for computation manager + one per pool)
678        let computation_event_rx = tycho_feed.subscribe();
679        let (computation_shutdown_tx, computation_shutdown_rx) = broadcast::channel(1);
680
681        let mut solver_pool_handles: Vec<SolverPoolHandle> = Vec::new();
682        let mut worker_pools: Vec<WorkerPool> = Vec::new();
683
684        for pool_entry in self.pools {
685            let pool_event_rx = tycho_feed.subscribe();
686            let derived_rx = derived_event_tx.subscribe();
687
688            let (worker_pool, task_handle) = match pool_entry {
689                PoolEntry::BuiltIn {
690                    name,
691                    algorithm,
692                    num_workers,
693                    task_queue_capacity,
694                    min_hops,
695                    max_hops,
696                    timeout_ms,
697                    max_routes,
698                    connector_tokens,
699                } => {
700                    let mut algo_cfg = AlgorithmConfig::new(
701                        min_hops,
702                        max_hops,
703                        Duration::from_millis(timeout_ms),
704                        max_routes,
705                    )?;
706                    if let Some(tokens) = connector_tokens {
707                        algo_cfg = algo_cfg.with_connector_tokens(tokens);
708                    }
709                    WorkerPoolBuilder::new()
710                        .name(name)
711                        .algorithm(algorithm)
712                        .algorithm_config(algo_cfg)
713                        .num_workers(num_workers)
714                        .task_queue_capacity(task_queue_capacity)
715                        .build(
716                            market_data.clone(),
717                            Arc::clone(&derived_data),
718                            pool_event_rx,
719                            derived_rx,
720                        )?
721                }
722                PoolEntry::Custom(custom) => {
723                    let algo_cfg = AlgorithmConfig::new(
724                        custom.min_hops,
725                        custom.max_hops,
726                        Duration::from_millis(custom.timeout_ms),
727                        custom.max_routes,
728                    )?;
729                    let builder = WorkerPoolBuilder::new()
730                        .name(custom.name)
731                        .algorithm_config(algo_cfg)
732                        .num_workers(custom.num_workers)
733                        .task_queue_capacity(custom.task_queue_capacity);
734                    let builder = (custom.configure)(builder);
735                    builder.build(
736                        market_data.clone(),
737                        Arc::clone(&derived_data),
738                        pool_event_rx,
739                        derived_rx,
740                    )?
741                }
742            };
743
744            solver_pool_handles.push(SolverPoolHandle::new(worker_pool.name(), task_handle));
745            worker_pools.push(worker_pool);
746        }
747
748        let encoder = match self.encoder {
749            Some(enc) => enc,
750            None => {
751                let registry = SwapEncoderRegistry::new(self.chain)
752                    .add_default_encoders(None)
753                    .map_err(|e| SolverBuildError::Encoder(e.to_string()))?;
754                Encoder::new(self.chain, registry)
755                    .map_err(|e| SolverBuildError::Encoder(e.to_string()))?
756            }
757        };
758
759        let chain = self.chain;
760        let router_address = encoder.router_address().clone();
761        let router_fees = encoder.router_fees();
762
763        let router_fee_fetcher = RouterFeeFetcher::new(
764            self.rpc_url.as_str(),
765            &router_address,
766            router_fees.clone(),
767            defaults::ROUTER_FEE_REFRESH_INTERVAL,
768        )
769        .map_err(|e| SolverBuildError::RouterFeeFetcher(e.to_string()))?;
770
771        // Only start price providers when the guard is enabled.
772        // When disabled, per-request attempts to enable the guard return an error.
773        let router_config = WorkerPoolRouterConfig::default()
774            .with_timeout(self.router_timeout)
775            .with_min_responses(self.router_min_responses);
776        let mut router = WorkerPoolRouter::new(solver_pool_handles, router_config, encoder);
777
778        if self.price_guard_enabled {
779            let mut registry = PriceProviderRegistry::new();
780            let mut worker_handles = Vec::new();
781            for mut provider in self.price_providers {
782                worker_handles.push(provider.start(market_data.clone()));
783                registry = registry.register(provider);
784            }
785            let price_guard = PriceGuard::new(registry, worker_handles);
786            router = router.with_price_guard(price_guard);
787        }
788
789        Ok(BuiltComponents {
790            tycho_feed,
791            gas_price_fetcher,
792            router_fee_fetcher,
793            computation_manager,
794            computation_event_rx,
795            computation_shutdown_tx,
796            computation_shutdown_rx,
797            router,
798            worker_pools,
799            market_data,
800            derived_data,
801            router_fees,
802            chain,
803            router_address,
804            pending_indexers: self.pending_indexers,
805            market_event_tx,
806        })
807    }
808
809    /// Assembles and starts all solver components.
810    ///
811    /// # Errors
812    ///
813    /// Returns [`SolverBuildError`] if any component fails to initialize.
814    pub fn build(self) -> Result<Solver, SolverBuildError> {
815        let mut c = self.assemble_components()?;
816
817        let feed_handle = tokio::spawn(async move {
818            if let Err(e) = c.tycho_feed.run().await {
819                tracing::error!(error = %e, "tycho feed error");
820            }
821        });
822        let gas_price_handle = tokio::spawn(async move {
823            c.gas_price_fetcher.run().await;
824        });
825        let router_fee_handle = tokio::spawn(async move {
826            c.router_fee_fetcher.run().await;
827        });
828        let computation_handle = tokio::spawn(async move {
829            c.computation_manager
830                .run(c.computation_event_rx, c.computation_shutdown_rx)
831                .await;
832        });
833
834        Ok(Solver {
835            router: c.router,
836            worker_pools: c.worker_pools,
837            market_data: c.market_data,
838            derived_data: c.derived_data,
839            router_fees: c.router_fees,
840            feed_handle,
841            gas_price_handle,
842            router_fee_handle,
843            computation_handle,
844            computation_shutdown_tx: c.computation_shutdown_tx,
845            chain: c.chain,
846            router_address: c.router_address,
847            market_event_tx: c.market_event_tx,
848        })
849    }
850
851    /// Assembles and starts all solver components, also returning a [`PendingBlockProcessor`]
852    /// for ephemeral bundle simulation against the live Tycho market state.
853    ///
854    /// Identical to [`build`](Self::build) except the feed task runs via
855    /// `TychoFeed::run_with_pending`. The `PendingBlockProcessor` is delivered after
856    /// token loading, before the first block is processed.
857    ///
858    /// # Errors
859    ///
860    /// Returns [`SolverBuildError`] if any component fails to initialize or the pending
861    /// channel closes before delivering the processor (e.g. token loading failed).
862    pub async fn build_with_pending(
863        self,
864    ) -> Result<(Solver, PendingBlockProcessor), SolverBuildError> {
865        let mut c = self.assemble_components()?;
866
867        let (pending_tx, pending_rx) =
868            tokio::sync::oneshot::channel::<Result<PendingBlockProcessor, String>>();
869
870        let pending_indexers = c.pending_indexers;
871        let feed_handle = tokio::spawn(async move {
872            if let Err(e) = c
873                .tycho_feed
874                .run_with_pending(pending_tx, pending_indexers)
875                .await
876            {
877                tracing::error!(error = %e, "tycho feed error");
878            }
879        });
880        let gas_price_handle = tokio::spawn(async move {
881            c.gas_price_fetcher.run().await;
882        });
883        let router_fee_handle = tokio::spawn(async move {
884            c.router_fee_fetcher.run().await;
885        });
886        let computation_handle = tokio::spawn(async move {
887            c.computation_manager
888                .run(c.computation_event_rx, c.computation_shutdown_rx)
889                .await;
890        });
891
892        let pending = pending_rx
893            .await
894            .map_err(|_| SolverBuildError::PendingChannelClosed)?
895            .map_err(SolverBuildError::FeedSetup)?;
896
897        Ok((
898            Solver {
899                router: c.router,
900                worker_pools: c.worker_pools,
901                market_data: c.market_data,
902                derived_data: c.derived_data,
903                router_fees: c.router_fees,
904                feed_handle,
905                gas_price_handle,
906                router_fee_handle,
907                computation_handle,
908                computation_shutdown_tx: c.computation_shutdown_tx,
909                chain: c.chain,
910                router_address: c.router_address,
911                market_event_tx: c.market_event_tx,
912            },
913            pending,
914        ))
915    }
916}
917
918/// A running solver assembled by [`FyndBuilder`].
919pub struct Solver {
920    router: WorkerPoolRouter,
921    worker_pools: Vec<WorkerPool>,
922    market_data: MarketData,
923    derived_data: SharedDerivedDataRef,
924    router_fees: SharedRouterFees,
925    feed_handle: JoinHandle<()>,
926    gas_price_handle: JoinHandle<()>,
927    router_fee_handle: JoinHandle<()>,
928    computation_handle: JoinHandle<()>,
929    computation_shutdown_tx: broadcast::Sender<()>,
930    chain: Chain,
931    router_address: Bytes,
932    market_event_tx: broadcast::Sender<MarketEvent>,
933}
934
935impl Solver {
936    /// Returns a clone of the shared market data reference.
937    pub fn market_data(&self) -> MarketData {
938        self.market_data.clone()
939    }
940
941    /// Returns a clone of the shared derived data reference.
942    pub fn derived_data(&self) -> SharedDerivedDataRef {
943        Arc::clone(&self.derived_data)
944    }
945
946    /// Returns a new receiver for [`MarketEvent`]s broadcast by the Tycho feed.
947    ///
948    /// Each call returns an independent receiver. Events are broadcast on every block update.
949    /// Receivers created after a block has been processed will miss that block's event.
950    pub fn subscribe_market_events(&self) -> broadcast::Receiver<crate::feed::events::MarketEvent> {
951        self.market_event_tx.subscribe()
952    }
953
954    /// Submits a [`QuoteRequest`] to the worker pools and returns the best [`Quote`].
955    ///
956    /// # Errors
957    ///
958    /// Returns [`SolveError`] if all pools fail or the router timeout elapses.
959    pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
960        self.router.quote(request).await
961    }
962
963    /// Waits until the solver is ready to answer quotes.
964    ///
965    /// Ready means:
966    /// - The Tycho feed has delivered at least one market snapshot.
967    /// - The computation manager has completed at least one derived-data cycle (spot prices, pool
968    ///   depths, token gas prices).
969    /// - Router fees have been loaded from the on-chain FeeCalculator at least once.
970    ///
971    /// The method polls every 500 ms and returns as soon as all conditions are
972    /// met, or returns [`WaitReadyError`] if `timeout` elapses first.
973    ///
974    /// # Example
975    ///
976    /// ```ignore
977    /// solver.wait_until_ready(Duration::from_secs(180)).await?;
978    /// ```
979    pub async fn wait_until_ready(&self, timeout: Duration) -> Result<(), WaitReadyError> {
980        const POLL_INTERVAL: Duration = Duration::from_millis(500);
981
982        let deadline = tokio::time::Instant::now() + timeout;
983
984        loop {
985            let market_ready = self
986                .market_data
987                .read()
988                .await
989                .last_updated()
990                .is_some();
991            let derived_ready = self
992                .derived_data
993                .read()
994                .await
995                .derived_data_ready();
996
997            if market_ready && derived_ready {
998                return Ok(());
999            }
1000
1001            if tokio::time::Instant::now() >= deadline {
1002                return Err(WaitReadyError { timeout_ms: timeout.as_millis() as u64 });
1003            }
1004
1005            tokio::time::sleep(POLL_INTERVAL).await;
1006        }
1007    }
1008
1009    /// Build a Solver by replaying recorded market updates.
1010    ///
1011    /// Creates the full pipeline (feed -> derived data -> worker pools -> router)
1012    /// from pre-recorded data instead of a live Tycho connection. The returned
1013    /// Solver behaves identically to a live one — call [`wait_until_ready`](Self::wait_until_ready)
1014    /// then [`quote`](Self::quote).
1015    ///
1016    /// VM-backed protocol states that couldn't be serialized will be absent from
1017    /// the recording. Pools without states will be registered as components but
1018    /// won't contribute to routing.
1019    ///
1020    /// Requires the `test-utils` feature.
1021    #[cfg(feature = "test-utils")]
1022    pub async fn from_recording(
1023        chain: Chain,
1024        updates: Vec<tycho_simulation::protocol::models::Update>,
1025        pools: std::collections::HashMap<String, PoolConfig>,
1026        gas_price_wei: Option<num_bigint::BigUint>,
1027    ) -> Result<Self, SolverBuildError> {
1028        if pools.is_empty() {
1029            return Err(SolverBuildError::NoPools);
1030        }
1031
1032        let market_data = MarketData::new_shared();
1033
1034        // Replay updates through TychoFeed (stays pub(crate))
1035        let feed_config =
1036            TychoFeedConfig::new("ws://replay".to_string(), chain, None, false, vec![], 0.0);
1037        let feed = TychoFeed::new(feed_config, market_data.clone());
1038        let market_event_tx = feed.event_sender();
1039        let _feed_rx = feed.subscribe();
1040
1041        for update in updates {
1042            feed.handle_tycho_message(update)
1043                .await
1044                .map_err(|e| SolverBuildError::Replay(e.to_string()))?;
1045        }
1046
1047        // Inject gas price (recorded value or default 10 gwei)
1048        let gas_price = match gas_price_wei {
1049            Some(price) => price,
1050            None => {
1051                tracing::warn!("no recorded gas price, defaulting to 10 gwei");
1052                num_bigint::BigUint::from(10_000_000_000u64)
1053            }
1054        };
1055        let block_number = match market_data.read().await.last_updated() {
1056            Some(block) => block.number(),
1057            None => {
1058                tracing::warn!("no block number from replayed updates, defaulting to 0");
1059                0
1060            }
1061        };
1062        {
1063            let mut market = market_data.write().await;
1064            market.update_gas_price(BlockGasPrice {
1065                block_number,
1066                block_hash: Default::default(),
1067                block_timestamp: 0,
1068                pricing: GasPrice::Legacy { gas_price },
1069            });
1070        }
1071
1072        // Computation manager
1073        let gas_token = native_token(&chain).map_err(|_| SolverBuildError::GasToken)?;
1074        let computation_config = ComputationManagerConfig::new()
1075            .with_gas_token(gas_token)
1076            .with_depth_slippage_threshold(DEFAULT_DEPTH_SLIPPAGE_THRESHOLD);
1077        let (computation_manager, _) =
1078            ComputationManager::new(computation_config, market_data.clone())
1079                .map_err(|e| SolverBuildError::ComputationManager(e.to_string()))?;
1080
1081        let derived_data: SharedDerivedDataRef = computation_manager.store();
1082        let derived_event_tx = computation_manager.event_sender();
1083
1084        let computation_event_rx = feed.subscribe();
1085        let (computation_shutdown_tx, computation_shutdown_rx) = broadcast::channel(1);
1086
1087        let computation_handle = tokio::spawn(async move {
1088            computation_manager
1089                .run(computation_event_rx, computation_shutdown_rx)
1090                .await;
1091        });
1092
1093        // Build worker pools BEFORE sending MarketUpdated
1094        let mut solver_pool_handles: Vec<SolverPoolHandle> = Vec::new();
1095        let mut worker_pools: Vec<WorkerPool> = Vec::new();
1096        let mut max_timeout_ms = 0u64;
1097
1098        for (name, pool_cfg) in &pools {
1099            let algo_cfg = AlgorithmConfig::new(
1100                pool_cfg.min_hops(),
1101                pool_cfg.max_hops(),
1102                Duration::from_millis(pool_cfg.timeout_ms()),
1103                pool_cfg.max_routes(),
1104            )?;
1105
1106            let pool_event_rx = feed.subscribe();
1107            let derived_rx = derived_event_tx.subscribe();
1108
1109            let (worker_pool, task_handle) = WorkerPoolBuilder::new()
1110                .name(name.clone())
1111                .algorithm(pool_cfg.algorithm().to_string())
1112                .algorithm_config(algo_cfg)
1113                .num_workers(pool_cfg.num_workers())
1114                .task_queue_capacity(pool_cfg.task_queue_capacity())
1115                .build(market_data.clone(), Arc::clone(&derived_data), pool_event_rx, derived_rx)?;
1116
1117            solver_pool_handles.push(SolverPoolHandle::new(worker_pool.name(), task_handle));
1118            max_timeout_ms = max_timeout_ms.max(pool_cfg.timeout_ms());
1119            worker_pools.push(worker_pool);
1120        }
1121
1122        // Encoder + router
1123        let encoder = {
1124            let registry = SwapEncoderRegistry::new(chain)
1125                .add_default_encoders(None)
1126                .map_err(|e| SolverBuildError::Encoder(e.to_string()))?;
1127            Encoder::new(chain, registry).map_err(|e| SolverBuildError::Encoder(e.to_string()))?
1128        };
1129
1130        let router_address = encoder.router_address().clone();
1131        // Replay mode has no FeeCalculator to read; seed a zero-fee config at the standard
1132        // 8-decimal scale so the recording-based solver reports ready (integration tests do
1133        // not exercise encoding).
1134        let router_fees = encoder.router_fees();
1135        router_fees.set(crate::encoding::router_fees::RouterFees::new(
1136            100_000_000,
1137            0,
1138            0,
1139            std::collections::HashMap::new(),
1140        ));
1141        let router_config = WorkerPoolRouterConfig::default()
1142            .with_timeout(Duration::from_millis(max_timeout_ms.max(5000)))
1143            .with_min_responses(defaults::ROUTER_MIN_RESPONSES);
1144        let router = WorkerPoolRouter::new(solver_pool_handles, router_config, encoder);
1145
1146        // Trigger derived data computation
1147        let market_read = market_data.read().await;
1148        let added = market_read.component_topology();
1149        drop(market_read);
1150
1151        if market_event_tx
1152            .send(MarketEvent::MarketUpdated {
1153                added_components: added,
1154                removed_components: vec![],
1155                updated_components: vec![],
1156            })
1157            .is_err()
1158        {
1159            tracing::warn!("no receivers for initial MarketUpdated broadcast");
1160        }
1161
1162        // Dummy handles for feed/gas/router-fees (not running in replay mode). The market
1163        // event channel stays alive through the `market_event_tx` field on `Solver`.
1164        let feed_handle = tokio::spawn(futures::future::pending::<()>());
1165        let gas_price_handle = tokio::spawn(async { /* no-op */ });
1166        let router_fee_handle = tokio::spawn(async { /* no-op */ });
1167
1168        Ok(Solver {
1169            router,
1170            worker_pools,
1171            market_data,
1172            derived_data,
1173            router_fees,
1174            feed_handle,
1175            gas_price_handle,
1176            router_fee_handle,
1177            computation_handle,
1178            computation_shutdown_tx,
1179            chain,
1180            router_address,
1181            market_event_tx,
1182        })
1183    }
1184
1185    /// Signals all worker pools and the computation manager to stop, then aborts background tasks.
1186    pub fn shutdown(self) {
1187        let _ = self.computation_shutdown_tx.send(());
1188        for pool in self.worker_pools {
1189            pool.shutdown();
1190        }
1191        self.feed_handle.abort();
1192        self.gas_price_handle.abort();
1193        self.router_fee_handle.abort();
1194    }
1195
1196    /// Consumes the solver into its raw parts for callers that add their own layer.
1197    pub fn into_parts(self) -> SolverParts {
1198        SolverParts {
1199            router: self.router,
1200            worker_pools: self.worker_pools,
1201            market_data: self.market_data,
1202            derived_data: self.derived_data,
1203            router_fees: self.router_fees,
1204            feed_handle: self.feed_handle,
1205            gas_price_handle: self.gas_price_handle,
1206            router_fee_handle: self.router_fee_handle,
1207            computation_handle: self.computation_handle,
1208            computation_shutdown_tx: self.computation_shutdown_tx,
1209            chain: self.chain,
1210            router_address: self.router_address,
1211        }
1212    }
1213}
1214
1215/// Raw components of a [`Solver`], for callers adding their own layer (e.g., an HTTP server).
1216///
1217/// Obtained via [`Solver::into_parts`].
1218pub struct SolverParts {
1219    /// Routes quote requests across worker pools.
1220    router: WorkerPoolRouter,
1221    /// One [`WorkerPool`] per configured algorithm pool.
1222    worker_pools: Vec<WorkerPool>,
1223    /// Live market snapshot shared across all components.
1224    market_data: MarketData,
1225    /// Derived on-chain data (spot prices, depths, gas costs) shared across all components.
1226    derived_data: SharedDerivedDataRef,
1227    /// Router fee configuration, refreshed from chain by the router-fee fetcher.
1228    router_fees: SharedRouterFees,
1229    /// Background task running the Tycho market-data feed.
1230    feed_handle: JoinHandle<()>,
1231    /// Background task polling the RPC node for gas prices.
1232    gas_price_handle: JoinHandle<()>,
1233    /// Background task refreshing router fees from the on-chain FeeCalculator.
1234    router_fee_handle: JoinHandle<()>,
1235    /// Background task running the computation manager.
1236    computation_handle: JoinHandle<()>,
1237    /// Send a unit value on this channel to trigger a graceful computation-manager shutdown.
1238    computation_shutdown_tx: broadcast::Sender<()>,
1239    /// Chain this solver is configured for.
1240    chain: Chain,
1241    /// Address of the Tycho Router contract on this chain.
1242    router_address: Bytes,
1243}
1244
1245impl SolverParts {
1246    /// Returns the chain this solver is configured for.
1247    pub fn chain(&self) -> Chain {
1248        self.chain
1249    }
1250
1251    /// Returns the Tycho Router contract address for this chain.
1252    pub fn router_address(&self) -> &Bytes {
1253        &self.router_address
1254    }
1255
1256    /// Returns a reference to the worker pools.
1257    pub fn worker_pools(&self) -> &[WorkerPool] {
1258        &self.worker_pools
1259    }
1260
1261    /// Returns a reference to the shared market data.
1262    pub fn market_data(&self) -> &MarketData {
1263        &self.market_data
1264    }
1265
1266    /// Returns a reference to the shared derived data.
1267    pub fn derived_data(&self) -> &SharedDerivedDataRef {
1268        &self.derived_data
1269    }
1270
1271    /// Returns a reference to the shared router fee configuration.
1272    pub fn router_fees(&self) -> &SharedRouterFees {
1273        &self.router_fees
1274    }
1275
1276    /// Consumes the parts and returns the router.
1277    pub fn into_router(self) -> WorkerPoolRouter {
1278        self.router
1279    }
1280
1281    /// Consumes the parts, returning all owned components.
1282    #[allow(clippy::type_complexity)]
1283    pub fn into_components(
1284        self,
1285    ) -> (
1286        WorkerPoolRouter,
1287        Vec<WorkerPool>,
1288        MarketData,
1289        SharedDerivedDataRef,
1290        JoinHandle<()>,
1291        JoinHandle<()>,
1292        JoinHandle<()>,
1293        JoinHandle<()>,
1294        broadcast::Sender<()>,
1295    ) {
1296        (
1297            self.router,
1298            self.worker_pools,
1299            self.market_data,
1300            self.derived_data,
1301            self.feed_handle,
1302            self.gas_price_handle,
1303            self.router_fee_handle,
1304            self.computation_handle,
1305            self.computation_shutdown_tx,
1306        )
1307    }
1308}