Skip to main content

fynd_core/
solver.rs

1//! High-level solver setup via [`FyndBuilder`].
2//!
3//! [`FyndBuilder`] assembles the full Tycho feed + gas fetcher + computation
4//! manager + one or more worker pools + encoder + router pipeline with sensible
5//! defaults. For simple cases a single call chain is all that's needed:
6//!
7//! ```ignore
8//! let solver = FyndBuilder::new(chain, tycho_url, rpc_url, protocols, min_tvl)
9//!     .tycho_api_key(key)
10//!     .algorithm("most_liquid")
11//!     .build()?;
12//! ```
13use std::{collections::HashSet, str::FromStr, sync::Arc, time::Duration};
14
15use num_cpus;
16use serde::{Deserialize, Serialize};
17use tokio::{sync::broadcast, task::JoinHandle};
18use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
19use tycho_simulation::{
20    tycho_common::{models::Chain, Bytes},
21    tycho_core::models::Address,
22    tycho_ethereum::rpc::EthereumRpcClient,
23};
24
25use crate::{
26    algorithm::{AlgorithmConfig, AlgorithmError},
27    derived::{ComputationManager, ComputationManagerConfig, SharedDerivedDataRef},
28    encoding::encoder::Encoder,
29    feed::{
30        events::MarketEventHandler, gas::GasPriceFetcher, market_data::MarketData,
31        tycho_feed::TychoFeed, TychoFeedConfig,
32    },
33    graph::EdgeWeightUpdaterWithDerived,
34    price_guard::{
35        guard::PriceGuard, provider::PriceProvider, provider_registry::PriceProviderRegistry,
36    },
37    types::constants::native_token,
38    worker_pool::{
39        pool::{WorkerPool, WorkerPoolBuilder},
40        registry::UnknownAlgorithmError,
41    },
42    worker_pool_router::{config::WorkerPoolRouterConfig, SolverPoolHandle, WorkerPoolRouter},
43    Algorithm, Quote, QuoteRequest, SolveError,
44};
45
46/// Default values for [`FyndBuilder`] configuration and [`PoolConfig`] deserialization.
47///
48/// These are the single source of truth for all tunable defaults. Downstream
49/// crates (e.g. `fynd-rpc`) should re-export or reference these rather than
50/// redeclaring their own copies.
51pub mod defaults {
52    use std::time::Duration;
53
54    /// Minimum token quality score required for a token to be included in routing.
55    pub const MIN_TOKEN_QUALITY: i32 = 100;
56    /// Maximum age (in days) of trading history required for a token to be considered liquid.
57    pub const TRADED_N_DAYS_AGO: u64 = 3;
58    /// Multiplier applied to a pool's TVL when estimating available liquidity.
59    pub const TVL_BUFFER_RATIO: f64 = 1.1;
60    /// How often the gas price is refreshed from the RPC node.
61    pub const GAS_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
62    /// Delay before reconnecting to the Tycho feed after a disconnect.
63    pub const RECONNECT_DELAY: Duration = Duration::from_secs(5);
64    /// Minimum number of solver pool responses required before returning a quote (`0` = wait for
65    /// all).
66    pub const ROUTER_MIN_RESPONSES: usize = 0;
67    /// Capacity of the task queue for each worker pool.
68    pub const POOL_TASK_QUEUE_CAPACITY: usize = 1000;
69    /// Minimum number of hops allowed in a route.
70    pub const POOL_MIN_HOPS: usize = 1;
71    /// Maximum number of hops allowed in a route.
72    pub const POOL_MAX_HOPS: usize = 3;
73    /// Per-pool solve timeout in milliseconds.
74    pub const POOL_TIMEOUT_MS: u64 = 100;
75}
76
77// Internal-only defaults not shared with downstream crates.
78const DEFAULT_TYCHO_USE_TLS: bool = true;
79const DEFAULT_DEPTH_SLIPPAGE_THRESHOLD: f64 = 0.01;
80/// Generous router timeout for standalone (non-server) use. HTTP services should
81/// override this to a tighter value appropriate for their SLA.
82const DEFAULT_ROUTER_TIMEOUT: Duration = Duration::from_secs(10);
83
84// serde requires free functions for `#[serde(default = "...")]` — these delegate to the
85// defaults module so both deserialization and the builder stay in sync.
86fn default_task_queue_capacity() -> usize {
87    defaults::POOL_TASK_QUEUE_CAPACITY
88}
89
90fn default_min_hops() -> usize {
91    defaults::POOL_MIN_HOPS
92}
93
94fn default_max_hops() -> usize {
95    defaults::POOL_MAX_HOPS
96}
97
98fn default_algo_timeout_ms() -> u64 {
99    defaults::POOL_TIMEOUT_MS
100}
101
102fn parse_connector_tokens(
103    raw: Option<&[String]>,
104) -> Result<Option<HashSet<Address>>, SolverBuildError> {
105    let Some(strings) = raw else {
106        return Ok(None);
107    };
108    let mut set = HashSet::with_capacity(strings.len());
109    for s in strings {
110        let addr = Address::from_str(s).map_err(|e| AlgorithmError::InvalidConfiguration {
111            reason: format!("connector_tokens: invalid address {s:?}: {e}"),
112        })?;
113        set.insert(addr);
114    }
115    Ok(Some(set))
116}
117
118/// Per-pool configuration for [`FyndBuilder::add_pool`].
119#[must_use]
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct PoolConfig {
122    /// Algorithm name for this pool (e.g., `"most_liquid"`).
123    algorithm: String,
124    /// Number of worker threads for this pool.
125    #[serde(default = "num_cpus::get")]
126    num_workers: usize,
127    /// Task queue capacity for this pool.
128    #[serde(default = "default_task_queue_capacity")]
129    task_queue_capacity: usize,
130    /// Minimum hops to search (must be >= 1).
131    #[serde(default = "default_min_hops")]
132    min_hops: usize,
133    /// Maximum hops to search.
134    #[serde(default = "default_max_hops")]
135    max_hops: usize,
136    /// Timeout for solving in milliseconds.
137    #[serde(default = "default_algo_timeout_ms")]
138    timeout_ms: u64,
139    /// Maximum number of paths to simulate per solve. `None` simulates all scored paths.
140    #[serde(default)]
141    max_routes: Option<usize>,
142    /// Lowercase hex addresses (e.g. `"0xc02aaa…"`) allowed as intermediate routing hops.
143    /// Absent = no restriction. Typically 3–10 entries (e.g. WETH, USDC, USDT, DAI).
144    #[serde(default)]
145    connector_tokens: Option<Vec<String>>,
146}
147
148impl PoolConfig {
149    /// Creates a new pool config with the given algorithm name and defaults for all other fields.
150    pub fn new(algorithm: impl Into<String>) -> Self {
151        Self {
152            algorithm: algorithm.into(),
153            num_workers: num_cpus::get(),
154            task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
155            min_hops: defaults::POOL_MIN_HOPS,
156            max_hops: defaults::POOL_MAX_HOPS,
157            timeout_ms: defaults::POOL_TIMEOUT_MS,
158            max_routes: None,
159            connector_tokens: None,
160        }
161    }
162
163    /// Returns the algorithm name.
164    pub fn algorithm(&self) -> &str {
165        &self.algorithm
166    }
167
168    /// Returns the number of worker threads.
169    pub fn num_workers(&self) -> usize {
170        self.num_workers
171    }
172
173    /// Sets the number of worker threads.
174    pub fn with_num_workers(mut self, num_workers: usize) -> Self {
175        self.num_workers = num_workers;
176        self
177    }
178
179    /// Sets the task queue capacity.
180    pub fn with_task_queue_capacity(mut self, task_queue_capacity: usize) -> Self {
181        self.task_queue_capacity = task_queue_capacity;
182        self
183    }
184
185    /// Sets the minimum hops.
186    pub fn with_min_hops(mut self, min_hops: usize) -> Self {
187        self.min_hops = min_hops;
188        self
189    }
190
191    /// Sets the maximum hops.
192    pub fn with_max_hops(mut self, max_hops: usize) -> Self {
193        self.max_hops = max_hops;
194        self
195    }
196
197    /// Sets the timeout in milliseconds.
198    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
199        self.timeout_ms = timeout_ms;
200        self
201    }
202
203    /// Sets the maximum number of routes to simulate.
204    pub fn with_max_routes(mut self, max_routes: Option<usize>) -> Self {
205        self.max_routes = max_routes;
206        self
207    }
208
209    /// Returns the task queue capacity.
210    pub fn task_queue_capacity(&self) -> usize {
211        self.task_queue_capacity
212    }
213
214    /// Returns the minimum hops.
215    pub fn min_hops(&self) -> usize {
216        self.min_hops
217    }
218
219    /// Returns the maximum hops.
220    pub fn max_hops(&self) -> usize {
221        self.max_hops
222    }
223
224    /// Returns the timeout in milliseconds.
225    pub fn timeout_ms(&self) -> u64 {
226        self.timeout_ms
227    }
228
229    /// Returns the maximum number of routes to simulate.
230    pub fn max_routes(&self) -> Option<usize> {
231        self.max_routes
232    }
233
234    /// Restricts intermediate hops to the given token addresses (hex strings with or without `0x`
235    /// prefix). Absent = no restriction.
236    pub fn with_connector_tokens(mut self, tokens: Vec<String>) -> Self {
237        self.connector_tokens = Some(tokens);
238        self
239    }
240
241    /// Returns the raw connector token address strings, or `None` if unrestricted.
242    pub fn connector_tokens(&self) -> Option<&[String]> {
243        self.connector_tokens.as_deref()
244    }
245}
246
247/// Error returned by [`Solver::wait_until_ready`].
248#[derive(Debug, thiserror::Error)]
249#[error("timed out after {timeout_ms}ms waiting for market data and derived computations")]
250pub struct WaitReadyError {
251    timeout_ms: u64,
252}
253
254/// Error returned by [`FyndBuilder::build`].
255#[non_exhaustive]
256#[derive(Debug, thiserror::Error)]
257pub enum SolverBuildError {
258    /// The Ethereum RPC client could not be created (e.g. malformed URL).
259    #[error("failed to create ethereum RPC client: {0}")]
260    RpcClient(String),
261    /// An invalid algorithm configuration was supplied.
262    #[error(transparent)]
263    AlgorithmConfig(#[from] AlgorithmError),
264    /// The [`ComputationManager`] failed to initialise.
265    #[error("failed to create computation manager: {0}")]
266    ComputationManager(String),
267    /// The swap encoder could not be created for the target chain.
268    #[error("failed to create encoder: {0}")]
269    Encoder(String),
270    /// A pool referenced an algorithm name that is not registered.
271    #[error(transparent)]
272    UnknownAlgorithm(#[from] UnknownAlgorithmError),
273    /// No native gas token is defined for the requested chain.
274    #[error("gas token not configured for chain")]
275    GasToken,
276    /// [`FyndBuilder::build`] was called without configuring any worker pools.
277    #[error("no worker pools configured")]
278    NoPools,
279}
280
281/// Internal pool entry — either a built-in algorithm (by name) or a custom one.
282enum PoolEntry {
283    BuiltIn {
284        name: String,
285        algorithm: String,
286        num_workers: usize,
287        task_queue_capacity: usize,
288        min_hops: usize,
289        max_hops: usize,
290        timeout_ms: u64,
291        max_routes: Option<usize>,
292        connector_tokens: Option<HashSet<Address>>,
293    },
294    Custom(CustomPoolEntry),
295}
296
297/// Pool entry backed by a custom [`Algorithm`] implementation.
298struct CustomPoolEntry {
299    name: String,
300    num_workers: usize,
301    task_queue_capacity: usize,
302    min_hops: usize,
303    max_hops: usize,
304    timeout_ms: u64,
305    max_routes: Option<usize>,
306    /// Applies the custom algorithm to a `WorkerPoolBuilder`.
307    configure: Box<dyn FnOnce(WorkerPoolBuilder) -> WorkerPoolBuilder + Send>,
308}
309
310/// Builder for assembling the full solver pipeline.
311#[must_use = "a builder does nothing until .build() is called"]
312/// Configures the Tycho market-data feed, gas price fetcher, derived-data
313/// computation manager, one or more worker pools, encoder, and router.
314pub struct FyndBuilder {
315    chain: Chain,
316    tycho_url: String,
317    rpc_url: String,
318    protocols: Vec<String>,
319    min_tvl: f64,
320    tycho_api_key: Option<String>,
321    tycho_use_tls: bool,
322    min_token_quality: i32,
323    traded_n_days_ago: u64,
324    tvl_buffer_ratio: f64,
325    gas_refresh_interval: Duration,
326    reconnect_delay: Duration,
327    blocklisted_components: HashSet<String>,
328    router_timeout: Duration,
329    router_min_responses: usize,
330    encoder: Option<Encoder>,
331    pools: Vec<PoolEntry>,
332    price_guard_enabled: bool,
333    price_providers: Vec<Box<dyn PriceProvider>>,
334}
335
336impl FyndBuilder {
337    /// Creates a new builder with the required parameters.
338    pub fn new(
339        chain: Chain,
340        tycho_url: impl Into<String>,
341        rpc_url: impl Into<String>,
342        protocols: Vec<String>,
343        min_tvl: f64,
344    ) -> Self {
345        Self {
346            chain,
347            tycho_url: tycho_url.into(),
348            rpc_url: rpc_url.into(),
349            protocols,
350            min_tvl,
351            tycho_api_key: None,
352            tycho_use_tls: DEFAULT_TYCHO_USE_TLS,
353            min_token_quality: defaults::MIN_TOKEN_QUALITY,
354            traded_n_days_ago: defaults::TRADED_N_DAYS_AGO,
355            tvl_buffer_ratio: defaults::TVL_BUFFER_RATIO,
356            gas_refresh_interval: defaults::GAS_REFRESH_INTERVAL,
357            reconnect_delay: defaults::RECONNECT_DELAY,
358            blocklisted_components: HashSet::new(),
359            router_timeout: DEFAULT_ROUTER_TIMEOUT,
360            router_min_responses: defaults::ROUTER_MIN_RESPONSES,
361            encoder: None,
362            pools: Vec::new(),
363            price_guard_enabled: false,
364            price_providers: Vec::new(),
365        }
366    }
367
368    /// The blockchain this builder is configured for.
369    pub fn chain(&self) -> Chain {
370        self.chain
371    }
372
373    /// Sets the Tycho API key.
374    pub fn tycho_api_key(mut self, key: impl Into<String>) -> Self {
375        self.tycho_api_key = Some(key.into());
376        self
377    }
378
379    /// Overrides the minimum TVL filter set in [`FyndBuilder::new`].
380    pub fn min_tvl(mut self, min_tvl: f64) -> Self {
381        self.min_tvl = min_tvl;
382        self
383    }
384
385    /// Enables or disables TLS for the Tycho WebSocket connection (default: `true`).
386    pub fn tycho_use_tls(mut self, use_tls: bool) -> Self {
387        self.tycho_use_tls = use_tls;
388        self
389    }
390
391    /// Sets the minimum token quality score; tokens below this threshold are excluded (default:
392    /// 100).
393    pub fn min_token_quality(mut self, quality: i32) -> Self {
394        self.min_token_quality = quality;
395        self
396    }
397
398    /// Filters out pools whose last trade is older than `days` days (default: 3).
399    pub fn traded_n_days_ago(mut self, days: u64) -> Self {
400        self.traded_n_days_ago = days;
401        self
402    }
403
404    /// Multiplies reported TVL by `ratio` before applying the `min_tvl` filter (default: 1.1).
405    pub fn tvl_buffer_ratio(mut self, ratio: f64) -> Self {
406        self.tvl_buffer_ratio = ratio;
407        self
408    }
409
410    /// Sets how often the gas price is refreshed from the RPC node (default: 30 s).
411    pub fn gas_refresh_interval(mut self, interval: Duration) -> Self {
412        self.gas_refresh_interval = interval;
413        self
414    }
415
416    /// Sets the delay before reconnecting to Tycho after a disconnection (default: 5 s).
417    pub fn reconnect_delay(mut self, delay: Duration) -> Self {
418        self.reconnect_delay = delay;
419        self
420    }
421
422    /// Sets component IDs to exclude from the Tycho stream.
423    pub fn blocklisted_components(mut self, components: HashSet<String>) -> Self {
424        self.blocklisted_components = components;
425        self
426    }
427
428    /// Sets the worker router timeout (default: 10s).
429    pub fn worker_router_timeout(mut self, timeout: Duration) -> Self {
430        self.router_timeout = timeout;
431        self
432    }
433
434    /// Sets the minimum number of solver responses before early return (default: 0).
435    pub fn worker_router_min_responses(mut self, min: usize) -> Self {
436        self.router_min_responses = min;
437        self
438    }
439
440    /// Overrides the default encoder.
441    pub fn encoder(mut self, encoder: Encoder) -> Self {
442        self.encoder = Some(encoder);
443        self
444    }
445
446    /// Shorthand: adds a single pool named `"default"` using a built-in algorithm by name.
447    pub fn algorithm(mut self, algorithm: impl Into<String>) -> Self {
448        self.pools.push(PoolEntry::BuiltIn {
449            name: "default".to_string(),
450            algorithm: algorithm.into(),
451            num_workers: num_cpus::get(),
452            task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
453            min_hops: defaults::POOL_MIN_HOPS,
454            max_hops: defaults::POOL_MAX_HOPS,
455            timeout_ms: defaults::POOL_TIMEOUT_MS,
456            max_routes: None,
457            connector_tokens: None,
458        });
459        self
460    }
461
462    /// Shorthand: adds a single pool with a custom [`Algorithm`] implementation.
463    ///
464    /// The `factory` closure is called once per worker thread.
465    pub fn with_algorithm<A, F>(mut self, name: impl Into<String>, factory: F) -> Self
466    where
467        A: Algorithm + 'static,
468        A::GraphManager: MarketEventHandler + EdgeWeightUpdaterWithDerived + 'static,
469        F: Fn(AlgorithmConfig) -> A + Clone + Send + Sync + 'static,
470    {
471        let name = name.into();
472        let algo_name = name.clone();
473        let configure =
474            Box::new(move |builder: WorkerPoolBuilder| builder.with_algorithm(algo_name, factory));
475        self.pools
476            .push(PoolEntry::Custom(CustomPoolEntry {
477                name,
478                num_workers: num_cpus::get(),
479                task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
480                min_hops: defaults::POOL_MIN_HOPS,
481                max_hops: defaults::POOL_MAX_HOPS,
482                timeout_ms: defaults::POOL_TIMEOUT_MS,
483                max_routes: None,
484                configure,
485            }));
486        self
487    }
488
489    /// Registers the built-in price providers (Hyperliquid + Binance).
490    ///
491    /// Called automatically during [`build`](Self::build) if no providers have been
492    /// registered and the price guard is not disabled. To use only custom
493    /// providers, call [`register_price_provider`](Self::register_price_provider)
494    /// before `build()` and the defaults will be skipped.
495    pub fn add_default_price_providers(self) -> Self {
496        self.register_price_provider(Box::new(
497            crate::price_guard::hyperliquid::HyperliquidProvider::default(),
498        ))
499        .register_price_provider(Box::new(
500            crate::price_guard::binance_ws::BinanceWsProvider::default(),
501        ))
502    }
503
504    /// Registers a custom price provider for the price guard.
505    ///
506    /// The provider's [`start`](PriceProvider::start) method is called during
507    /// [`build`](Self::build) with the shared market data.
508    pub fn register_price_provider(mut self, provider: Box<dyn PriceProvider>) -> Self {
509        self.price_providers.push(provider);
510        self
511    }
512
513    /// Enables or disables the price guard.
514    ///
515    /// When enabled, providers are started and caches stay warm. Validation
516    /// only runs for requests where the client sets `enabled: true` in
517    /// `PriceGuardConfig`. When disabled, no providers are started and
518    /// per-request attempts to use the guard return an error.
519    pub fn price_guard_enabled(mut self, enabled: bool) -> Self {
520        self.price_guard_enabled = enabled;
521        self
522    }
523
524    /// Adds a named pool using the given [`PoolConfig`].
525    ///
526    /// # Errors
527    ///
528    /// Returns [`SolverBuildError::AlgorithmConfig`] if any address in `connector_tokens` is not
529    /// valid hex.
530    pub fn add_pool(
531        mut self,
532        name: impl Into<String>,
533        config: &PoolConfig,
534    ) -> Result<Self, SolverBuildError> {
535        let connector_tokens = parse_connector_tokens(config.connector_tokens())?;
536        self.pools.push(PoolEntry::BuiltIn {
537            name: name.into(),
538            algorithm: config.algorithm().to_string(),
539            num_workers: config.num_workers(),
540            task_queue_capacity: config.task_queue_capacity(),
541            min_hops: config.min_hops(),
542            max_hops: config.max_hops(),
543            timeout_ms: config.timeout_ms(),
544            max_routes: config.max_routes(),
545            connector_tokens,
546        });
547        Ok(self)
548    }
549
550    /// Assembles and starts all solver components.
551    ///
552    /// # Errors
553    ///
554    /// Returns [`SolverBuildError`] if any component fails to initialize.
555    pub fn build(mut self) -> Result<Solver, SolverBuildError> {
556        if self.pools.is_empty() {
557            return Err(SolverBuildError::NoPools);
558        }
559
560        // Add built-in providers if none were explicitly registered.
561        if self.price_providers.is_empty() {
562            self = self.add_default_price_providers();
563        }
564
565        let market_data = MarketData::new_shared();
566
567        let tycho_feed_config = TychoFeedConfig::new(
568            self.tycho_url,
569            self.chain,
570            self.tycho_api_key,
571            self.tycho_use_tls,
572            self.protocols,
573            self.min_tvl,
574        )
575        .tvl_buffer_ratio(self.tvl_buffer_ratio)
576        .gas_refresh_interval(self.gas_refresh_interval)
577        .reconnect_delay(self.reconnect_delay)
578        .min_token_quality(self.min_token_quality)
579        .traded_n_days_ago(self.traded_n_days_ago)
580        .blocklisted_components(self.blocklisted_components);
581
582        let ethereum_client = EthereumRpcClient::new(self.rpc_url.as_str())
583            .map_err(|e| SolverBuildError::RpcClient(e.to_string()))?;
584
585        let (mut gas_price_fetcher, gas_price_worker_signal_tx) =
586            GasPriceFetcher::new(ethereum_client, market_data.clone());
587
588        let mut tycho_feed = TychoFeed::new(tycho_feed_config, market_data.clone());
589        tycho_feed = tycho_feed.with_gas_price_worker_signal_tx(gas_price_worker_signal_tx);
590
591        let gas_token = native_token(&self.chain).map_err(|_| SolverBuildError::GasToken)?;
592        let computation_config = ComputationManagerConfig::new()
593            .with_gas_token(gas_token)
594            .with_depth_slippage_threshold(DEFAULT_DEPTH_SLIPPAGE_THRESHOLD);
595        // ComputationManager::new returns a broadcast receiver that we don't need here —
596        // workers subscribe via computation_manager.event_sender() below.
597        let (computation_manager, _) =
598            ComputationManager::new(computation_config, market_data.clone())
599                .map_err(|e| SolverBuildError::ComputationManager(e.to_string()))?;
600
601        let derived_data: SharedDerivedDataRef = computation_manager.store();
602        let derived_event_tx = computation_manager.event_sender();
603
604        // Subscribe event channels before spawning (one for computation manager + one per pool)
605        let computation_event_rx = tycho_feed.subscribe();
606        let (computation_shutdown_tx, computation_shutdown_rx) = broadcast::channel(1);
607
608        let mut solver_pool_handles: Vec<SolverPoolHandle> = Vec::new();
609        let mut worker_pools: Vec<WorkerPool> = Vec::new();
610
611        for pool_entry in self.pools {
612            let pool_event_rx = tycho_feed.subscribe();
613            let derived_rx = derived_event_tx.subscribe();
614
615            let (worker_pool, task_handle) = match pool_entry {
616                PoolEntry::BuiltIn {
617                    name,
618                    algorithm,
619                    num_workers,
620                    task_queue_capacity,
621                    min_hops,
622                    max_hops,
623                    timeout_ms,
624                    max_routes,
625                    connector_tokens,
626                } => {
627                    let mut algo_cfg = AlgorithmConfig::new(
628                        min_hops,
629                        max_hops,
630                        Duration::from_millis(timeout_ms),
631                        max_routes,
632                    )?;
633                    if let Some(tokens) = connector_tokens {
634                        algo_cfg = algo_cfg.with_connector_tokens(tokens);
635                    }
636                    WorkerPoolBuilder::new()
637                        .name(name)
638                        .algorithm(algorithm)
639                        .algorithm_config(algo_cfg)
640                        .num_workers(num_workers)
641                        .task_queue_capacity(task_queue_capacity)
642                        .build(
643                            market_data.clone(),
644                            Arc::clone(&derived_data),
645                            pool_event_rx,
646                            derived_rx,
647                        )?
648                }
649                PoolEntry::Custom(custom) => {
650                    let algo_cfg = AlgorithmConfig::new(
651                        custom.min_hops,
652                        custom.max_hops,
653                        Duration::from_millis(custom.timeout_ms),
654                        custom.max_routes,
655                    )?;
656                    let builder = WorkerPoolBuilder::new()
657                        .name(custom.name)
658                        .algorithm_config(algo_cfg)
659                        .num_workers(custom.num_workers)
660                        .task_queue_capacity(custom.task_queue_capacity);
661                    let builder = (custom.configure)(builder);
662                    builder.build(
663                        market_data.clone(),
664                        Arc::clone(&derived_data),
665                        pool_event_rx,
666                        derived_rx,
667                    )?
668                }
669            };
670
671            solver_pool_handles.push(SolverPoolHandle::new(worker_pool.name(), task_handle));
672            worker_pools.push(worker_pool);
673        }
674
675        let encoder = match self.encoder {
676            Some(enc) => enc,
677            None => {
678                let registry = SwapEncoderRegistry::new(self.chain)
679                    .add_default_encoders(None)
680                    .map_err(|e| SolverBuildError::Encoder(e.to_string()))?;
681                Encoder::new(self.chain, registry)
682                    .map_err(|e| SolverBuildError::Encoder(e.to_string()))?
683            }
684        };
685
686        let chain = self.chain;
687        let router_address = encoder.router_address().clone();
688
689        // Only start price providers when the guard is enabled.
690        // When disabled, per-request attempts to enable the guard return an error.
691        let router_config = WorkerPoolRouterConfig::default()
692            .with_timeout(self.router_timeout)
693            .with_min_responses(self.router_min_responses);
694        let mut router = WorkerPoolRouter::new(solver_pool_handles, router_config, encoder);
695
696        if self.price_guard_enabled {
697            let mut registry = PriceProviderRegistry::new();
698            let mut worker_handles = Vec::new();
699            for mut provider in self.price_providers {
700                worker_handles.push(provider.start(market_data.clone()));
701                registry = registry.register(provider);
702            }
703            let price_guard = PriceGuard::new(registry, worker_handles);
704            router = router.with_price_guard(price_guard);
705        }
706
707        let feed_handle = tokio::spawn(async move {
708            if let Err(e) = tycho_feed.run().await {
709                tracing::error!(error = %e, "tycho feed error");
710            }
711        });
712
713        let gas_price_handle = tokio::spawn(async move {
714            if let Err(e) = gas_price_fetcher.run().await {
715                tracing::error!(error = %e, "gas price fetcher error");
716            }
717        });
718
719        let computation_handle = tokio::spawn(async move {
720            computation_manager
721                .run(computation_event_rx, computation_shutdown_rx)
722                .await;
723        });
724
725        Ok(Solver {
726            router,
727            worker_pools,
728            market_data,
729            derived_data,
730            feed_handle,
731            gas_price_handle,
732            computation_handle,
733            computation_shutdown_tx,
734            chain,
735            router_address,
736        })
737    }
738}
739
740/// A running solver assembled by [`FyndBuilder`].
741pub struct Solver {
742    router: WorkerPoolRouter,
743    worker_pools: Vec<WorkerPool>,
744    market_data: MarketData,
745    derived_data: SharedDerivedDataRef,
746    feed_handle: JoinHandle<()>,
747    gas_price_handle: JoinHandle<()>,
748    computation_handle: JoinHandle<()>,
749    computation_shutdown_tx: broadcast::Sender<()>,
750    chain: Chain,
751    router_address: Bytes,
752}
753
754impl Solver {
755    /// Returns a clone of the shared market data reference.
756    pub fn market_data(&self) -> MarketData {
757        self.market_data.clone()
758    }
759
760    /// Returns a clone of the shared derived data reference.
761    pub fn derived_data(&self) -> SharedDerivedDataRef {
762        Arc::clone(&self.derived_data)
763    }
764
765    /// Submits a [`QuoteRequest`] to the worker pools and returns the best [`Quote`].
766    ///
767    /// # Errors
768    ///
769    /// Returns [`SolveError`] if all pools fail or the router timeout elapses.
770    pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
771        self.router.quote(request).await
772    }
773
774    /// Waits until the solver is ready to answer quotes.
775    ///
776    /// Ready means:
777    /// - The Tycho feed has delivered at least one market snapshot.
778    /// - The computation manager has completed at least one derived-data cycle (spot prices, pool
779    ///   depths, token gas prices).
780    ///
781    /// The method polls every 500 ms and returns as soon as both conditions are
782    /// met, or returns [`WaitReadyError`] if `timeout` elapses first.
783    ///
784    /// # Example
785    ///
786    /// ```ignore
787    /// solver.wait_until_ready(Duration::from_secs(180)).await?;
788    /// ```
789    pub async fn wait_until_ready(&self, timeout: Duration) -> Result<(), WaitReadyError> {
790        const POLL_INTERVAL: Duration = Duration::from_millis(500);
791
792        let deadline = tokio::time::Instant::now() + timeout;
793
794        loop {
795            let market_ready = self
796                .market_data
797                .read()
798                .await
799                .last_updated()
800                .is_some();
801            let derived_ready = self
802                .derived_data
803                .read()
804                .await
805                .derived_data_ready();
806
807            if market_ready && derived_ready {
808                return Ok(());
809            }
810
811            if tokio::time::Instant::now() >= deadline {
812                return Err(WaitReadyError { timeout_ms: timeout.as_millis() as u64 });
813            }
814
815            tokio::time::sleep(POLL_INTERVAL).await;
816        }
817    }
818
819    /// Signals all worker pools and the computation manager to stop, then aborts background tasks.
820    pub fn shutdown(self) {
821        let _ = self.computation_shutdown_tx.send(());
822        for pool in self.worker_pools {
823            pool.shutdown();
824        }
825        self.feed_handle.abort();
826        self.gas_price_handle.abort();
827    }
828
829    /// Consumes the solver into its raw parts for callers that add their own layer.
830    pub fn into_parts(self) -> SolverParts {
831        SolverParts {
832            router: self.router,
833            worker_pools: self.worker_pools,
834            market_data: self.market_data,
835            derived_data: self.derived_data,
836            feed_handle: self.feed_handle,
837            gas_price_handle: self.gas_price_handle,
838            computation_handle: self.computation_handle,
839            computation_shutdown_tx: self.computation_shutdown_tx,
840            chain: self.chain,
841            router_address: self.router_address,
842        }
843    }
844}
845
846/// Raw components of a [`Solver`], for callers adding their own layer (e.g., an HTTP server).
847///
848/// Obtained via [`Solver::into_parts`].
849pub struct SolverParts {
850    /// Routes quote requests across worker pools.
851    router: WorkerPoolRouter,
852    /// One [`WorkerPool`] per configured algorithm pool.
853    worker_pools: Vec<WorkerPool>,
854    /// Live market snapshot shared across all components.
855    market_data: MarketData,
856    /// Derived on-chain data (spot prices, depths, gas costs) shared across all components.
857    derived_data: SharedDerivedDataRef,
858    /// Background task running the Tycho market-data feed.
859    feed_handle: JoinHandle<()>,
860    /// Background task polling the RPC node for gas prices.
861    gas_price_handle: JoinHandle<()>,
862    /// Background task running the computation manager.
863    computation_handle: JoinHandle<()>,
864    /// Send a unit value on this channel to trigger a graceful computation-manager shutdown.
865    computation_shutdown_tx: broadcast::Sender<()>,
866    /// Chain this solver is configured for.
867    chain: Chain,
868    /// Address of the Tycho Router contract on this chain.
869    router_address: Bytes,
870}
871
872impl SolverParts {
873    /// Returns the chain this solver is configured for.
874    pub fn chain(&self) -> Chain {
875        self.chain
876    }
877
878    /// Returns the Tycho Router contract address for this chain.
879    pub fn router_address(&self) -> &Bytes {
880        &self.router_address
881    }
882
883    /// Returns a reference to the worker pools.
884    pub fn worker_pools(&self) -> &[WorkerPool] {
885        &self.worker_pools
886    }
887
888    /// Returns a reference to the shared market data.
889    pub fn market_data(&self) -> &MarketData {
890        &self.market_data
891    }
892
893    /// Returns a reference to the shared derived data.
894    pub fn derived_data(&self) -> &SharedDerivedDataRef {
895        &self.derived_data
896    }
897
898    /// Consumes the parts and returns the router.
899    pub fn into_router(self) -> WorkerPoolRouter {
900        self.router
901    }
902
903    /// Consumes the parts, returning all owned components.
904    #[allow(clippy::type_complexity)]
905    pub fn into_components(
906        self,
907    ) -> (
908        WorkerPoolRouter,
909        Vec<WorkerPool>,
910        MarketData,
911        SharedDerivedDataRef,
912        JoinHandle<()>,
913        JoinHandle<()>,
914        JoinHandle<()>,
915        broadcast::Sender<()>,
916    ) {
917        (
918            self.router,
919            self.worker_pools,
920            self.market_data,
921            self.derived_data,
922            self.feed_handle,
923            self.gas_price_handle,
924            self.computation_handle,
925            self.computation_shutdown_tx,
926        )
927    }
928}