Skip to main content

fynd_core/
solver.rs

1//! High-level solver setup via [`FyndBuilder`].
2//!
3//! [`FyndBuilder`] assembles the full Tycho feed + gas fetcher + computation
4//! manager + one or more worker pools + encoder + router pipeline with sensible
5//! defaults. For simple cases a single call chain is all that's needed:
6//!
7//! ```ignore
8//! let solver = FyndBuilder::new(chain, tycho_url, rpc_url, protocols, min_tvl)
9//!     .tycho_api_key(key)
10//!     .algorithm("most_liquid")
11//!     .build()?;
12//! ```
13use std::{collections::HashSet, sync::Arc, time::Duration};
14
15use num_cpus;
16use serde::{Deserialize, Serialize};
17use tokio::{sync::broadcast, task::JoinHandle};
18use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
19use tycho_simulation::{
20    tycho_common::{models::Chain, Bytes},
21    tycho_ethereum::rpc::EthereumRpcClient,
22};
23
24use crate::{
25    algorithm::{AlgorithmConfig, AlgorithmError},
26    derived::{ComputationManager, ComputationManagerConfig, SharedDerivedDataRef},
27    encoding::encoder::Encoder,
28    feed::{
29        events::MarketEventHandler,
30        gas::GasPriceFetcher,
31        market_data::{SharedMarketData, SharedMarketDataRef},
32        tycho_feed::TychoFeed,
33        TychoFeedConfig,
34    },
35    graph::EdgeWeightUpdaterWithDerived,
36    price_guard::{
37        guard::PriceGuard, provider::PriceProvider, provider_registry::PriceProviderRegistry,
38    },
39    types::constants::native_token,
40    worker_pool::{
41        pool::{WorkerPool, WorkerPoolBuilder},
42        registry::UnknownAlgorithmError,
43    },
44    worker_pool_router::{config::WorkerPoolRouterConfig, SolverPoolHandle, WorkerPoolRouter},
45    Algorithm, Quote, QuoteRequest, SolveError,
46};
47
48/// Default values for [`FyndBuilder`] configuration and [`PoolConfig`] deserialization.
49///
50/// These are the single source of truth for all tunable defaults. Downstream
51/// crates (e.g. `fynd-rpc`) should re-export or reference these rather than
52/// redeclaring their own copies.
53pub mod defaults {
54    use std::time::Duration;
55
56    /// Minimum token quality score required for a token to be included in routing.
57    pub const MIN_TOKEN_QUALITY: i32 = 100;
58    /// Maximum age (in days) of trading history required for a token to be considered liquid.
59    pub const TRADED_N_DAYS_AGO: u64 = 3;
60    /// Multiplier applied to a pool's TVL when estimating available liquidity.
61    pub const TVL_BUFFER_RATIO: f64 = 1.1;
62    /// How often the gas price is refreshed from the RPC node.
63    pub const GAS_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
64    /// Delay before reconnecting to the Tycho feed after a disconnect.
65    pub const RECONNECT_DELAY: Duration = Duration::from_secs(5);
66    /// Minimum number of solver pool responses required before returning a quote (`0` = wait for
67    /// all).
68    pub const ROUTER_MIN_RESPONSES: usize = 0;
69    /// Capacity of the task queue for each worker pool.
70    pub const POOL_TASK_QUEUE_CAPACITY: usize = 1000;
71    /// Minimum number of hops allowed in a route.
72    pub const POOL_MIN_HOPS: usize = 1;
73    /// Maximum number of hops allowed in a route.
74    pub const POOL_MAX_HOPS: usize = 3;
75    /// Per-pool solve timeout in milliseconds.
76    pub const POOL_TIMEOUT_MS: u64 = 100;
77}
78
79// Internal-only defaults not shared with downstream crates.
80const DEFAULT_TYCHO_USE_TLS: bool = true;
81const DEFAULT_DEPTH_SLIPPAGE_THRESHOLD: f64 = 0.01;
82/// Generous router timeout for standalone (non-server) use. HTTP services should
83/// override this to a tighter value appropriate for their SLA.
84const DEFAULT_ROUTER_TIMEOUT: Duration = Duration::from_secs(10);
85
86// serde requires free functions for `#[serde(default = "...")]` — these delegate to the
87// defaults module so both deserialization and the builder stay in sync.
88fn default_task_queue_capacity() -> usize {
89    defaults::POOL_TASK_QUEUE_CAPACITY
90}
91
92fn default_min_hops() -> usize {
93    defaults::POOL_MIN_HOPS
94}
95
96fn default_max_hops() -> usize {
97    defaults::POOL_MAX_HOPS
98}
99
100fn default_algo_timeout_ms() -> u64 {
101    defaults::POOL_TIMEOUT_MS
102}
103
104/// Per-pool configuration for [`FyndBuilder::add_pool`].
105#[must_use]
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct PoolConfig {
108    /// Algorithm name for this pool (e.g., `"most_liquid"`).
109    algorithm: String,
110    /// Number of worker threads for this pool.
111    #[serde(default = "num_cpus::get")]
112    num_workers: usize,
113    /// Task queue capacity for this pool.
114    #[serde(default = "default_task_queue_capacity")]
115    task_queue_capacity: usize,
116    /// Minimum hops to search (must be >= 1).
117    #[serde(default = "default_min_hops")]
118    min_hops: usize,
119    /// Maximum hops to search.
120    #[serde(default = "default_max_hops")]
121    max_hops: usize,
122    /// Timeout for solving in milliseconds.
123    #[serde(default = "default_algo_timeout_ms")]
124    timeout_ms: u64,
125    /// Maximum number of paths to simulate per solve. `None` simulates all scored paths.
126    #[serde(default)]
127    max_routes: Option<usize>,
128}
129
130impl PoolConfig {
131    /// Creates a new pool config with the given algorithm name and defaults for all other fields.
132    pub fn new(algorithm: impl Into<String>) -> Self {
133        Self {
134            algorithm: algorithm.into(),
135            num_workers: num_cpus::get(),
136            task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
137            min_hops: defaults::POOL_MIN_HOPS,
138            max_hops: defaults::POOL_MAX_HOPS,
139            timeout_ms: defaults::POOL_TIMEOUT_MS,
140            max_routes: None,
141        }
142    }
143
144    /// Returns the algorithm name.
145    pub fn algorithm(&self) -> &str {
146        &self.algorithm
147    }
148
149    /// Returns the number of worker threads.
150    pub fn num_workers(&self) -> usize {
151        self.num_workers
152    }
153
154    /// Sets the number of worker threads.
155    pub fn with_num_workers(mut self, num_workers: usize) -> Self {
156        self.num_workers = num_workers;
157        self
158    }
159
160    /// Sets the task queue capacity.
161    pub fn with_task_queue_capacity(mut self, task_queue_capacity: usize) -> Self {
162        self.task_queue_capacity = task_queue_capacity;
163        self
164    }
165
166    /// Sets the minimum hops.
167    pub fn with_min_hops(mut self, min_hops: usize) -> Self {
168        self.min_hops = min_hops;
169        self
170    }
171
172    /// Sets the maximum hops.
173    pub fn with_max_hops(mut self, max_hops: usize) -> Self {
174        self.max_hops = max_hops;
175        self
176    }
177
178    /// Sets the timeout in milliseconds.
179    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
180        self.timeout_ms = timeout_ms;
181        self
182    }
183
184    /// Sets the maximum number of routes to simulate.
185    pub fn with_max_routes(mut self, max_routes: Option<usize>) -> Self {
186        self.max_routes = max_routes;
187        self
188    }
189
190    /// Returns the task queue capacity.
191    pub fn task_queue_capacity(&self) -> usize {
192        self.task_queue_capacity
193    }
194
195    /// Returns the minimum hops.
196    pub fn min_hops(&self) -> usize {
197        self.min_hops
198    }
199
200    /// Returns the maximum hops.
201    pub fn max_hops(&self) -> usize {
202        self.max_hops
203    }
204
205    /// Returns the timeout in milliseconds.
206    pub fn timeout_ms(&self) -> u64 {
207        self.timeout_ms
208    }
209
210    /// Returns the maximum number of routes to simulate.
211    pub fn max_routes(&self) -> Option<usize> {
212        self.max_routes
213    }
214}
215
216/// Error returned by [`Solver::wait_until_ready`].
217#[derive(Debug, thiserror::Error)]
218#[error("timed out after {timeout_ms}ms waiting for market data and derived computations")]
219pub struct WaitReadyError {
220    timeout_ms: u64,
221}
222
223/// Error returned by [`FyndBuilder::build`].
224#[non_exhaustive]
225#[derive(Debug, thiserror::Error)]
226pub enum SolverBuildError {
227    /// The Ethereum RPC client could not be created (e.g. malformed URL).
228    #[error("failed to create ethereum RPC client: {0}")]
229    RpcClient(String),
230    /// An invalid algorithm configuration was supplied.
231    #[error(transparent)]
232    AlgorithmConfig(#[from] AlgorithmError),
233    /// The [`ComputationManager`] failed to initialise.
234    #[error("failed to create computation manager: {0}")]
235    ComputationManager(String),
236    /// The swap encoder could not be created for the target chain.
237    #[error("failed to create encoder: {0}")]
238    Encoder(String),
239    /// A pool referenced an algorithm name that is not registered.
240    #[error(transparent)]
241    UnknownAlgorithm(#[from] UnknownAlgorithmError),
242    /// No native gas token is defined for the requested chain.
243    #[error("gas token not configured for chain")]
244    GasToken,
245    /// [`FyndBuilder::build`] was called without configuring any worker pools.
246    #[error("no worker pools configured")]
247    NoPools,
248}
249
250/// Internal pool entry — either a built-in algorithm (by name) or a custom one.
251enum PoolEntry {
252    BuiltIn {
253        name: String,
254        algorithm: String,
255        num_workers: usize,
256        task_queue_capacity: usize,
257        min_hops: usize,
258        max_hops: usize,
259        timeout_ms: u64,
260        max_routes: Option<usize>,
261    },
262    Custom(CustomPoolEntry),
263}
264
265/// Pool entry backed by a custom [`Algorithm`] implementation.
266struct CustomPoolEntry {
267    name: String,
268    num_workers: usize,
269    task_queue_capacity: usize,
270    min_hops: usize,
271    max_hops: usize,
272    timeout_ms: u64,
273    max_routes: Option<usize>,
274    /// Applies the custom algorithm to a `WorkerPoolBuilder`.
275    configure: Box<dyn FnOnce(WorkerPoolBuilder) -> WorkerPoolBuilder + Send>,
276}
277
278/// Builder for assembling the full solver pipeline.
279#[must_use = "a builder does nothing until .build() is called"]
280/// Configures the Tycho market-data feed, gas price fetcher, derived-data
281/// computation manager, one or more worker pools, encoder, and router.
282pub struct FyndBuilder {
283    chain: Chain,
284    tycho_url: String,
285    rpc_url: String,
286    protocols: Vec<String>,
287    min_tvl: f64,
288    tycho_api_key: Option<String>,
289    tycho_use_tls: bool,
290    min_token_quality: i32,
291    traded_n_days_ago: u64,
292    tvl_buffer_ratio: f64,
293    gas_refresh_interval: Duration,
294    reconnect_delay: Duration,
295    blocklisted_components: HashSet<String>,
296    router_timeout: Duration,
297    router_min_responses: usize,
298    encoder: Option<Encoder>,
299    pools: Vec<PoolEntry>,
300    price_guard_enabled: bool,
301    price_providers: Vec<Box<dyn PriceProvider>>,
302}
303
304impl FyndBuilder {
305    /// Creates a new builder with the required parameters.
306    pub fn new(
307        chain: Chain,
308        tycho_url: impl Into<String>,
309        rpc_url: impl Into<String>,
310        protocols: Vec<String>,
311        min_tvl: f64,
312    ) -> Self {
313        Self {
314            chain,
315            tycho_url: tycho_url.into(),
316            rpc_url: rpc_url.into(),
317            protocols,
318            min_tvl,
319            tycho_api_key: None,
320            tycho_use_tls: DEFAULT_TYCHO_USE_TLS,
321            min_token_quality: defaults::MIN_TOKEN_QUALITY,
322            traded_n_days_ago: defaults::TRADED_N_DAYS_AGO,
323            tvl_buffer_ratio: defaults::TVL_BUFFER_RATIO,
324            gas_refresh_interval: defaults::GAS_REFRESH_INTERVAL,
325            reconnect_delay: defaults::RECONNECT_DELAY,
326            blocklisted_components: HashSet::new(),
327            router_timeout: DEFAULT_ROUTER_TIMEOUT,
328            router_min_responses: defaults::ROUTER_MIN_RESPONSES,
329            encoder: None,
330            pools: Vec::new(),
331            price_guard_enabled: false,
332            price_providers: Vec::new(),
333        }
334    }
335
336    /// The blockchain this builder is configured for.
337    pub fn chain(&self) -> Chain {
338        self.chain
339    }
340
341    /// Sets the Tycho API key.
342    pub fn tycho_api_key(mut self, key: impl Into<String>) -> Self {
343        self.tycho_api_key = Some(key.into());
344        self
345    }
346
347    /// Overrides the minimum TVL filter set in [`FyndBuilder::new`].
348    pub fn min_tvl(mut self, min_tvl: f64) -> Self {
349        self.min_tvl = min_tvl;
350        self
351    }
352
353    /// Enables or disables TLS for the Tycho WebSocket connection (default: `true`).
354    pub fn tycho_use_tls(mut self, use_tls: bool) -> Self {
355        self.tycho_use_tls = use_tls;
356        self
357    }
358
359    /// Sets the minimum token quality score; tokens below this threshold are excluded (default:
360    /// 100).
361    pub fn min_token_quality(mut self, quality: i32) -> Self {
362        self.min_token_quality = quality;
363        self
364    }
365
366    /// Filters out pools whose last trade is older than `days` days (default: 3).
367    pub fn traded_n_days_ago(mut self, days: u64) -> Self {
368        self.traded_n_days_ago = days;
369        self
370    }
371
372    /// Multiplies reported TVL by `ratio` before applying the `min_tvl` filter (default: 1.1).
373    pub fn tvl_buffer_ratio(mut self, ratio: f64) -> Self {
374        self.tvl_buffer_ratio = ratio;
375        self
376    }
377
378    /// Sets how often the gas price is refreshed from the RPC node (default: 30 s).
379    pub fn gas_refresh_interval(mut self, interval: Duration) -> Self {
380        self.gas_refresh_interval = interval;
381        self
382    }
383
384    /// Sets the delay before reconnecting to Tycho after a disconnection (default: 5 s).
385    pub fn reconnect_delay(mut self, delay: Duration) -> Self {
386        self.reconnect_delay = delay;
387        self
388    }
389
390    /// Sets component IDs to exclude from the Tycho stream.
391    pub fn blocklisted_components(mut self, components: HashSet<String>) -> Self {
392        self.blocklisted_components = components;
393        self
394    }
395
396    /// Sets the worker router timeout (default: 10s).
397    pub fn worker_router_timeout(mut self, timeout: Duration) -> Self {
398        self.router_timeout = timeout;
399        self
400    }
401
402    /// Sets the minimum number of solver responses before early return (default: 0).
403    pub fn worker_router_min_responses(mut self, min: usize) -> Self {
404        self.router_min_responses = min;
405        self
406    }
407
408    /// Overrides the default encoder.
409    pub fn encoder(mut self, encoder: Encoder) -> Self {
410        self.encoder = Some(encoder);
411        self
412    }
413
414    /// Shorthand: adds a single pool named `"default"` using a built-in algorithm by name.
415    pub fn algorithm(mut self, algorithm: impl Into<String>) -> Self {
416        self.pools.push(PoolEntry::BuiltIn {
417            name: "default".to_string(),
418            algorithm: algorithm.into(),
419            num_workers: num_cpus::get(),
420            task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
421            min_hops: defaults::POOL_MIN_HOPS,
422            max_hops: defaults::POOL_MAX_HOPS,
423            timeout_ms: defaults::POOL_TIMEOUT_MS,
424            max_routes: None,
425        });
426        self
427    }
428
429    /// Shorthand: adds a single pool with a custom [`Algorithm`] implementation.
430    ///
431    /// The `factory` closure is called once per worker thread.
432    pub fn with_algorithm<A, F>(mut self, name: impl Into<String>, factory: F) -> Self
433    where
434        A: Algorithm + 'static,
435        A::GraphManager: MarketEventHandler + EdgeWeightUpdaterWithDerived + 'static,
436        F: Fn(AlgorithmConfig) -> A + Clone + Send + Sync + 'static,
437    {
438        let name = name.into();
439        let algo_name = name.clone();
440        let configure =
441            Box::new(move |builder: WorkerPoolBuilder| builder.with_algorithm(algo_name, factory));
442        self.pools
443            .push(PoolEntry::Custom(CustomPoolEntry {
444                name,
445                num_workers: num_cpus::get(),
446                task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
447                min_hops: defaults::POOL_MIN_HOPS,
448                max_hops: defaults::POOL_MAX_HOPS,
449                timeout_ms: defaults::POOL_TIMEOUT_MS,
450                max_routes: None,
451                configure,
452            }));
453        self
454    }
455
456    /// Registers the built-in price providers (Hyperliquid + Binance).
457    ///
458    /// Called automatically during [`build`](Self::build) if no providers have been
459    /// registered and the price guard is not disabled. To use only custom
460    /// providers, call [`register_price_provider`](Self::register_price_provider)
461    /// before `build()` and the defaults will be skipped.
462    pub fn add_default_price_providers(self) -> Self {
463        self.register_price_provider(Box::new(
464            crate::price_guard::hyperliquid::HyperliquidProvider::default(),
465        ))
466        .register_price_provider(Box::new(
467            crate::price_guard::binance_ws::BinanceWsProvider::default(),
468        ))
469    }
470
471    /// Registers a custom price provider for the price guard.
472    ///
473    /// The provider's [`start`](PriceProvider::start) method is called during
474    /// [`build`](Self::build) with the shared market data.
475    pub fn register_price_provider(mut self, provider: Box<dyn PriceProvider>) -> Self {
476        self.price_providers.push(provider);
477        self
478    }
479
480    /// Enables or disables the price guard.
481    ///
482    /// When enabled, providers are started and caches stay warm. Validation
483    /// only runs for requests where the client sets `enabled: true` in
484    /// `PriceGuardConfig`. When disabled, no providers are started and
485    /// per-request attempts to use the guard return an error.
486    pub fn price_guard_enabled(mut self, enabled: bool) -> Self {
487        self.price_guard_enabled = enabled;
488        self
489    }
490
491    /// Adds a named pool using the given [`PoolConfig`].
492    pub fn add_pool(mut self, name: impl Into<String>, config: &PoolConfig) -> Self {
493        self.pools.push(PoolEntry::BuiltIn {
494            name: name.into(),
495            algorithm: config.algorithm().to_string(),
496            num_workers: config.num_workers(),
497            task_queue_capacity: config.task_queue_capacity(),
498            min_hops: config.min_hops(),
499            max_hops: config.max_hops(),
500            timeout_ms: config.timeout_ms(),
501            max_routes: config.max_routes(),
502        });
503        self
504    }
505
506    /// Assembles and starts all solver components.
507    ///
508    /// # Errors
509    ///
510    /// Returns [`SolverBuildError`] if any component fails to initialize.
511    pub fn build(mut self) -> Result<Solver, SolverBuildError> {
512        if self.pools.is_empty() {
513            return Err(SolverBuildError::NoPools);
514        }
515
516        // Add built-in providers if none were explicitly registered.
517        if self.price_providers.is_empty() {
518            self = self.add_default_price_providers();
519        }
520
521        let market_data = Arc::new(tokio::sync::RwLock::new(SharedMarketData::new()));
522
523        let tycho_feed_config = TychoFeedConfig::new(
524            self.tycho_url,
525            self.chain,
526            self.tycho_api_key,
527            self.tycho_use_tls,
528            self.protocols,
529            self.min_tvl,
530        )
531        .tvl_buffer_ratio(self.tvl_buffer_ratio)
532        .gas_refresh_interval(self.gas_refresh_interval)
533        .reconnect_delay(self.reconnect_delay)
534        .min_token_quality(self.min_token_quality)
535        .traded_n_days_ago(self.traded_n_days_ago)
536        .blocklisted_components(self.blocklisted_components);
537
538        let ethereum_client = EthereumRpcClient::new(self.rpc_url.as_str())
539            .map_err(|e| SolverBuildError::RpcClient(e.to_string()))?;
540
541        let (mut gas_price_fetcher, gas_price_worker_signal_tx) =
542            GasPriceFetcher::new(ethereum_client, Arc::clone(&market_data));
543
544        let mut tycho_feed = TychoFeed::new(tycho_feed_config, Arc::clone(&market_data));
545        tycho_feed = tycho_feed.with_gas_price_worker_signal_tx(gas_price_worker_signal_tx);
546
547        let gas_token = native_token(&self.chain).map_err(|_| SolverBuildError::GasToken)?;
548        let computation_config = ComputationManagerConfig::new()
549            .with_gas_token(gas_token)
550            .with_depth_slippage_threshold(DEFAULT_DEPTH_SLIPPAGE_THRESHOLD);
551        // ComputationManager::new returns a broadcast receiver that we don't need here —
552        // workers subscribe via computation_manager.event_sender() below.
553        let (computation_manager, _) =
554            ComputationManager::new(computation_config, Arc::clone(&market_data))
555                .map_err(|e| SolverBuildError::ComputationManager(e.to_string()))?;
556
557        let derived_data: SharedDerivedDataRef = computation_manager.store();
558        let derived_event_tx = computation_manager.event_sender();
559
560        // Subscribe event channels before spawning (one for computation manager + one per pool)
561        let computation_event_rx = tycho_feed.subscribe();
562        let (computation_shutdown_tx, computation_shutdown_rx) = broadcast::channel(1);
563
564        let mut solver_pool_handles: Vec<SolverPoolHandle> = Vec::new();
565        let mut worker_pools: Vec<WorkerPool> = Vec::new();
566
567        for pool_entry in self.pools {
568            let pool_event_rx = tycho_feed.subscribe();
569            let derived_rx = derived_event_tx.subscribe();
570
571            let (worker_pool, task_handle) = match pool_entry {
572                PoolEntry::BuiltIn {
573                    name,
574                    algorithm,
575                    num_workers,
576                    task_queue_capacity,
577                    min_hops,
578                    max_hops,
579                    timeout_ms,
580                    max_routes,
581                } => {
582                    let algo_cfg = AlgorithmConfig::new(
583                        min_hops,
584                        max_hops,
585                        Duration::from_millis(timeout_ms),
586                        max_routes,
587                    )?;
588                    WorkerPoolBuilder::new()
589                        .name(name)
590                        .algorithm(algorithm)
591                        .algorithm_config(algo_cfg)
592                        .num_workers(num_workers)
593                        .task_queue_capacity(task_queue_capacity)
594                        .build(
595                            Arc::clone(&market_data),
596                            Arc::clone(&derived_data),
597                            pool_event_rx,
598                            derived_rx,
599                        )?
600                }
601                PoolEntry::Custom(custom) => {
602                    let algo_cfg = AlgorithmConfig::new(
603                        custom.min_hops,
604                        custom.max_hops,
605                        Duration::from_millis(custom.timeout_ms),
606                        custom.max_routes,
607                    )?;
608                    let builder = WorkerPoolBuilder::new()
609                        .name(custom.name)
610                        .algorithm_config(algo_cfg)
611                        .num_workers(custom.num_workers)
612                        .task_queue_capacity(custom.task_queue_capacity);
613                    let builder = (custom.configure)(builder);
614                    builder.build(
615                        Arc::clone(&market_data),
616                        Arc::clone(&derived_data),
617                        pool_event_rx,
618                        derived_rx,
619                    )?
620                }
621            };
622
623            solver_pool_handles.push(SolverPoolHandle::new(worker_pool.name(), task_handle));
624            worker_pools.push(worker_pool);
625        }
626
627        let encoder = match self.encoder {
628            Some(enc) => enc,
629            None => {
630                let registry = SwapEncoderRegistry::new(self.chain)
631                    .add_default_encoders(None)
632                    .map_err(|e| SolverBuildError::Encoder(e.to_string()))?;
633                Encoder::new(self.chain, registry)
634                    .map_err(|e| SolverBuildError::Encoder(e.to_string()))?
635            }
636        };
637
638        let chain = self.chain;
639        let router_address = encoder.router_address().clone();
640
641        // Only start price providers when the guard is enabled.
642        // When disabled, per-request attempts to enable the guard return an error.
643        let router_config = WorkerPoolRouterConfig::default()
644            .with_timeout(self.router_timeout)
645            .with_min_responses(self.router_min_responses);
646        let mut router = WorkerPoolRouter::new(solver_pool_handles, router_config, encoder);
647
648        if self.price_guard_enabled {
649            let mut registry = PriceProviderRegistry::new();
650            let mut worker_handles = Vec::new();
651            for mut provider in self.price_providers {
652                worker_handles.push(provider.start(Arc::clone(&market_data)));
653                registry = registry.register(provider);
654            }
655            let price_guard = PriceGuard::new(registry, worker_handles);
656            router = router.with_price_guard(price_guard);
657        }
658
659        let feed_handle = tokio::spawn(async move {
660            if let Err(e) = tycho_feed.run().await {
661                tracing::error!(error = %e, "tycho feed error");
662            }
663        });
664
665        let gas_price_handle = tokio::spawn(async move {
666            if let Err(e) = gas_price_fetcher.run().await {
667                tracing::error!(error = %e, "gas price fetcher error");
668            }
669        });
670
671        let computation_handle = tokio::spawn(async move {
672            computation_manager
673                .run(computation_event_rx, computation_shutdown_rx)
674                .await;
675        });
676
677        Ok(Solver {
678            router,
679            worker_pools,
680            market_data,
681            derived_data,
682            feed_handle,
683            gas_price_handle,
684            computation_handle,
685            computation_shutdown_tx,
686            chain,
687            router_address,
688        })
689    }
690}
691
692/// A running solver assembled by [`FyndBuilder`].
693pub struct Solver {
694    router: WorkerPoolRouter,
695    worker_pools: Vec<WorkerPool>,
696    market_data: SharedMarketDataRef,
697    derived_data: SharedDerivedDataRef,
698    feed_handle: JoinHandle<()>,
699    gas_price_handle: JoinHandle<()>,
700    computation_handle: JoinHandle<()>,
701    computation_shutdown_tx: broadcast::Sender<()>,
702    chain: Chain,
703    router_address: Bytes,
704}
705
706impl Solver {
707    /// Returns a clone of the shared market data reference.
708    pub fn market_data(&self) -> SharedMarketDataRef {
709        Arc::clone(&self.market_data)
710    }
711
712    /// Returns a clone of the shared derived data reference.
713    pub fn derived_data(&self) -> SharedDerivedDataRef {
714        Arc::clone(&self.derived_data)
715    }
716
717    /// Submits a [`QuoteRequest`] to the worker pools and returns the best [`Quote`].
718    ///
719    /// # Errors
720    ///
721    /// Returns [`SolveError`] if all pools fail or the router timeout elapses.
722    pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
723        self.router.quote(request).await
724    }
725
726    /// Waits until the solver is ready to answer quotes.
727    ///
728    /// Ready means:
729    /// - The Tycho feed has delivered at least one market snapshot.
730    /// - The computation manager has completed at least one derived-data cycle (spot prices, pool
731    ///   depths, token gas prices).
732    ///
733    /// The method polls every 500 ms and returns as soon as both conditions are
734    /// met, or returns [`WaitReadyError`] if `timeout` elapses first.
735    ///
736    /// # Example
737    ///
738    /// ```ignore
739    /// solver.wait_until_ready(Duration::from_secs(180)).await?;
740    /// ```
741    pub async fn wait_until_ready(&self, timeout: Duration) -> Result<(), WaitReadyError> {
742        const POLL_INTERVAL: Duration = Duration::from_millis(500);
743
744        let deadline = tokio::time::Instant::now() + timeout;
745
746        loop {
747            let market_ready = self
748                .market_data
749                .read()
750                .await
751                .last_updated()
752                .is_some();
753            let derived_ready = self
754                .derived_data
755                .read()
756                .await
757                .derived_data_ready();
758
759            if market_ready && derived_ready {
760                return Ok(());
761            }
762
763            if tokio::time::Instant::now() >= deadline {
764                return Err(WaitReadyError { timeout_ms: timeout.as_millis() as u64 });
765            }
766
767            tokio::time::sleep(POLL_INTERVAL).await;
768        }
769    }
770
771    /// Signals all worker pools and the computation manager to stop, then aborts background tasks.
772    pub fn shutdown(self) {
773        let _ = self.computation_shutdown_tx.send(());
774        for pool in self.worker_pools {
775            pool.shutdown();
776        }
777        self.feed_handle.abort();
778        self.gas_price_handle.abort();
779    }
780
781    /// Consumes the solver into its raw parts for callers that add their own layer.
782    pub fn into_parts(self) -> SolverParts {
783        SolverParts {
784            router: self.router,
785            worker_pools: self.worker_pools,
786            market_data: self.market_data,
787            derived_data: self.derived_data,
788            feed_handle: self.feed_handle,
789            gas_price_handle: self.gas_price_handle,
790            computation_handle: self.computation_handle,
791            computation_shutdown_tx: self.computation_shutdown_tx,
792            chain: self.chain,
793            router_address: self.router_address,
794        }
795    }
796}
797
798/// Raw components of a [`Solver`], for callers adding their own layer (e.g., an HTTP server).
799///
800/// Obtained via [`Solver::into_parts`].
801pub struct SolverParts {
802    /// Routes quote requests across worker pools.
803    router: WorkerPoolRouter,
804    /// One [`WorkerPool`] per configured algorithm pool.
805    worker_pools: Vec<WorkerPool>,
806    /// Live market snapshot shared across all components.
807    market_data: SharedMarketDataRef,
808    /// Derived on-chain data (spot prices, depths, gas costs) shared across all components.
809    derived_data: SharedDerivedDataRef,
810    /// Background task running the Tycho market-data feed.
811    feed_handle: JoinHandle<()>,
812    /// Background task polling the RPC node for gas prices.
813    gas_price_handle: JoinHandle<()>,
814    /// Background task running the computation manager.
815    computation_handle: JoinHandle<()>,
816    /// Send a unit value on this channel to trigger a graceful computation-manager shutdown.
817    computation_shutdown_tx: broadcast::Sender<()>,
818    /// Chain this solver is configured for.
819    chain: Chain,
820    /// Address of the Tycho Router contract on this chain.
821    router_address: Bytes,
822}
823
824impl SolverParts {
825    /// Returns the chain this solver is configured for.
826    pub fn chain(&self) -> Chain {
827        self.chain
828    }
829
830    /// Returns the Tycho Router contract address for this chain.
831    pub fn router_address(&self) -> &Bytes {
832        &self.router_address
833    }
834
835    /// Returns a reference to the worker pools.
836    pub fn worker_pools(&self) -> &[WorkerPool] {
837        &self.worker_pools
838    }
839
840    /// Returns a reference to the shared market data.
841    pub fn market_data(&self) -> &SharedMarketDataRef {
842        &self.market_data
843    }
844
845    /// Returns a reference to the shared derived data.
846    pub fn derived_data(&self) -> &SharedDerivedDataRef {
847        &self.derived_data
848    }
849
850    /// Consumes the parts and returns the router.
851    pub fn into_router(self) -> WorkerPoolRouter {
852        self.router
853    }
854
855    /// Consumes the parts, returning all owned components.
856    #[allow(clippy::type_complexity)]
857    pub fn into_components(
858        self,
859    ) -> (
860        WorkerPoolRouter,
861        Vec<WorkerPool>,
862        SharedMarketDataRef,
863        SharedDerivedDataRef,
864        JoinHandle<()>,
865        JoinHandle<()>,
866        JoinHandle<()>,
867        broadcast::Sender<()>,
868    ) {
869        (
870            self.router,
871            self.worker_pools,
872            self.market_data,
873            self.derived_data,
874            self.feed_handle,
875            self.gas_price_handle,
876            self.computation_handle,
877            self.computation_shutdown_tx,
878        )
879    }
880}