Skip to main content

fynd_core/
solver.rs

1//! High-level solver setup via [`FyndBuilder`].
2//!
3//! [`FyndBuilder`] assembles the full Tycho feed + gas fetcher + computation
4//! manager + one or more worker pools + encoder + router pipeline with sensible
5//! defaults. For simple cases a single call chain is all that's needed:
6//!
7//! ```ignore
8//! let solver = FyndBuilder::new(chain, tycho_url, rpc_url, protocols, min_tvl)
9//!     .tycho_api_key(key)
10//!     .algorithm("most_liquid")
11//!     .build()?;
12//! ```
13use std::{collections::HashSet, sync::Arc, time::Duration};
14
15use num_cpus;
16use serde::{Deserialize, Serialize};
17use tokio::{sync::broadcast, task::JoinHandle};
18use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
19use tycho_simulation::{
20    tycho_common::{models::Chain, Bytes},
21    tycho_ethereum::rpc::EthereumRpcClient,
22};
23
24use crate::{
25    algorithm::{AlgorithmConfig, AlgorithmError},
26    derived::{ComputationManager, ComputationManagerConfig, SharedDerivedDataRef},
27    encoding::encoder::Encoder,
28    feed::{
29        events::MarketEventHandler,
30        gas::GasPriceFetcher,
31        market_data::{SharedMarketData, SharedMarketDataRef},
32        tycho_feed::TychoFeed,
33        TychoFeedConfig,
34    },
35    graph::EdgeWeightUpdaterWithDerived,
36    types::constants::native_token,
37    worker_pool::{
38        pool::{WorkerPool, WorkerPoolBuilder},
39        registry::UnknownAlgorithmError,
40    },
41    worker_pool_router::{config::WorkerPoolRouterConfig, SolverPoolHandle, WorkerPoolRouter},
42    Algorithm, Quote, QuoteRequest, SolveError,
43};
44
45/// Default values for [`FyndBuilder`] configuration and [`PoolConfig`] deserialization.
46///
47/// These are the single source of truth for all tunable defaults. Downstream
48/// crates (e.g. `fynd-rpc`) should re-export or reference these rather than
49/// redeclaring their own copies.
50pub mod defaults {
51    use std::time::Duration;
52
53    pub const MIN_TOKEN_QUALITY: i32 = 100;
54    pub const TRADED_N_DAYS_AGO: u64 = 3;
55    pub const TVL_BUFFER_RATIO: f64 = 1.1;
56    pub const GAS_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
57    pub const RECONNECT_DELAY: Duration = Duration::from_secs(5);
58    pub const ROUTER_MIN_RESPONSES: usize = 0;
59    pub const POOL_TASK_QUEUE_CAPACITY: usize = 1000;
60    pub const POOL_MIN_HOPS: usize = 1;
61    pub const POOL_MAX_HOPS: usize = 3;
62    pub const POOL_TIMEOUT_MS: u64 = 100;
63}
64
65// Internal-only defaults not shared with downstream crates.
66const DEFAULT_TYCHO_USE_TLS: bool = true;
67const DEFAULT_DEPTH_SLIPPAGE_THRESHOLD: f64 = 0.01;
68/// Generous router timeout for standalone (non-server) use. HTTP services should
69/// override this to a tighter value appropriate for their SLA.
70const DEFAULT_ROUTER_TIMEOUT: Duration = Duration::from_secs(10);
71
72// serde requires free functions for `#[serde(default = "...")]` — these delegate to the
73// defaults module so both deserialization and the builder stay in sync.
74fn default_task_queue_capacity() -> usize {
75    defaults::POOL_TASK_QUEUE_CAPACITY
76}
77
78fn default_min_hops() -> usize {
79    defaults::POOL_MIN_HOPS
80}
81
82fn default_max_hops() -> usize {
83    defaults::POOL_MAX_HOPS
84}
85
86fn default_algo_timeout_ms() -> u64 {
87    defaults::POOL_TIMEOUT_MS
88}
89
90/// Per-pool configuration for [`FyndBuilder::add_pool`].
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct PoolConfig {
93    /// Algorithm name for this pool (e.g., `"most_liquid"`).
94    algorithm: String,
95    /// Number of worker threads for this pool.
96    #[serde(default = "num_cpus::get")]
97    num_workers: usize,
98    /// Task queue capacity for this pool.
99    #[serde(default = "default_task_queue_capacity")]
100    task_queue_capacity: usize,
101    /// Minimum hops to search (must be >= 1).
102    #[serde(default = "default_min_hops")]
103    min_hops: usize,
104    /// Maximum hops to search.
105    #[serde(default = "default_max_hops")]
106    max_hops: usize,
107    /// Timeout for solving in milliseconds.
108    #[serde(default = "default_algo_timeout_ms")]
109    timeout_ms: u64,
110    /// Maximum number of paths to simulate per solve. `None` simulates all scored paths.
111    #[serde(default)]
112    max_routes: Option<usize>,
113}
114
115impl PoolConfig {
116    /// Creates a new pool config with the given algorithm name and defaults for all other fields.
117    pub fn new(algorithm: impl Into<String>) -> Self {
118        Self {
119            algorithm: algorithm.into(),
120            num_workers: num_cpus::get(),
121            task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
122            min_hops: defaults::POOL_MIN_HOPS,
123            max_hops: defaults::POOL_MAX_HOPS,
124            timeout_ms: defaults::POOL_TIMEOUT_MS,
125            max_routes: None,
126        }
127    }
128
129    /// Returns the algorithm name.
130    pub fn algorithm(&self) -> &str {
131        &self.algorithm
132    }
133
134    /// Returns the number of worker threads.
135    pub fn num_workers(&self) -> usize {
136        self.num_workers
137    }
138
139    /// Sets the number of worker threads.
140    pub fn with_num_workers(mut self, num_workers: usize) -> Self {
141        self.num_workers = num_workers;
142        self
143    }
144
145    /// Sets the task queue capacity.
146    pub fn with_task_queue_capacity(mut self, task_queue_capacity: usize) -> Self {
147        self.task_queue_capacity = task_queue_capacity;
148        self
149    }
150
151    /// Sets the minimum hops.
152    pub fn with_min_hops(mut self, min_hops: usize) -> Self {
153        self.min_hops = min_hops;
154        self
155    }
156
157    /// Sets the maximum hops.
158    pub fn with_max_hops(mut self, max_hops: usize) -> Self {
159        self.max_hops = max_hops;
160        self
161    }
162
163    /// Sets the timeout in milliseconds.
164    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
165        self.timeout_ms = timeout_ms;
166        self
167    }
168
169    /// Sets the maximum number of routes to simulate.
170    pub fn with_max_routes(mut self, max_routes: Option<usize>) -> Self {
171        self.max_routes = max_routes;
172        self
173    }
174
175    /// Returns the task queue capacity.
176    pub fn task_queue_capacity(&self) -> usize {
177        self.task_queue_capacity
178    }
179
180    /// Returns the minimum hops.
181    pub fn min_hops(&self) -> usize {
182        self.min_hops
183    }
184
185    /// Returns the maximum hops.
186    pub fn max_hops(&self) -> usize {
187        self.max_hops
188    }
189
190    /// Returns the timeout in milliseconds.
191    pub fn timeout_ms(&self) -> u64 {
192        self.timeout_ms
193    }
194
195    /// Returns the maximum number of routes to simulate.
196    pub fn max_routes(&self) -> Option<usize> {
197        self.max_routes
198    }
199}
200
201/// Error returned by [`Solver::wait_until_ready`].
202#[derive(Debug, thiserror::Error)]
203#[error("timed out after {timeout_ms}ms waiting for market data and derived computations")]
204pub struct WaitReadyError {
205    timeout_ms: u64,
206}
207
208/// Error returned by [`FyndBuilder::build`].
209#[non_exhaustive]
210#[derive(Debug, thiserror::Error)]
211pub enum SolverBuildError {
212    /// The Ethereum RPC client could not be created (e.g. malformed URL).
213    #[error("failed to create ethereum RPC client: {0}")]
214    RpcClient(String),
215    /// An invalid algorithm configuration was supplied.
216    #[error(transparent)]
217    AlgorithmConfig(#[from] AlgorithmError),
218    /// The [`ComputationManager`] failed to initialise.
219    #[error("failed to create computation manager: {0}")]
220    ComputationManager(String),
221    /// The swap encoder could not be created for the target chain.
222    #[error("failed to create encoder: {0}")]
223    Encoder(String),
224    /// A pool referenced an algorithm name that is not registered.
225    #[error(transparent)]
226    UnknownAlgorithm(#[from] UnknownAlgorithmError),
227    /// No native gas token is defined for the requested chain.
228    #[error("gas token not configured for chain")]
229    GasToken,
230    /// [`FyndBuilder::build`] was called without configuring any worker pools.
231    #[error("no worker pools configured")]
232    NoPools,
233}
234
235/// Internal pool entry — either a built-in algorithm (by name) or a custom one.
236enum PoolEntry {
237    BuiltIn {
238        name: String,
239        algorithm: String,
240        num_workers: usize,
241        task_queue_capacity: usize,
242        min_hops: usize,
243        max_hops: usize,
244        timeout_ms: u64,
245        max_routes: Option<usize>,
246    },
247    Custom(CustomPoolEntry),
248}
249
250/// Pool entry backed by a custom [`Algorithm`] implementation.
251struct CustomPoolEntry {
252    name: String,
253    num_workers: usize,
254    task_queue_capacity: usize,
255    min_hops: usize,
256    max_hops: usize,
257    timeout_ms: u64,
258    max_routes: Option<usize>,
259    /// Applies the custom algorithm to a `WorkerPoolBuilder`.
260    configure: Box<dyn FnOnce(WorkerPoolBuilder) -> WorkerPoolBuilder + Send>,
261}
262
263/// Builder for assembling the full solver pipeline.
264#[must_use = "a builder does nothing until .build() is called"]
265/// Configures the Tycho market-data feed, gas price fetcher, derived-data
266/// computation manager, one or more worker pools, encoder, and router.
267pub struct FyndBuilder {
268    chain: Chain,
269    tycho_url: String,
270    rpc_url: String,
271    protocols: Vec<String>,
272    min_tvl: f64,
273    tycho_api_key: Option<String>,
274    tycho_use_tls: bool,
275    min_token_quality: i32,
276    traded_n_days_ago: u64,
277    tvl_buffer_ratio: f64,
278    gas_refresh_interval: Duration,
279    reconnect_delay: Duration,
280    blacklisted_components: HashSet<String>,
281    router_timeout: Duration,
282    router_min_responses: usize,
283    encoder: Option<Encoder>,
284    pools: Vec<PoolEntry>,
285}
286
287impl FyndBuilder {
288    /// Creates a new builder with the required parameters.
289    pub fn new(
290        chain: Chain,
291        tycho_url: impl Into<String>,
292        rpc_url: impl Into<String>,
293        protocols: Vec<String>,
294        min_tvl: f64,
295    ) -> Self {
296        Self {
297            chain,
298            tycho_url: tycho_url.into(),
299            rpc_url: rpc_url.into(),
300            protocols,
301            min_tvl,
302            tycho_api_key: None,
303            tycho_use_tls: DEFAULT_TYCHO_USE_TLS,
304            min_token_quality: defaults::MIN_TOKEN_QUALITY,
305            traded_n_days_ago: defaults::TRADED_N_DAYS_AGO,
306            tvl_buffer_ratio: defaults::TVL_BUFFER_RATIO,
307            gas_refresh_interval: defaults::GAS_REFRESH_INTERVAL,
308            reconnect_delay: defaults::RECONNECT_DELAY,
309            blacklisted_components: HashSet::new(),
310            router_timeout: DEFAULT_ROUTER_TIMEOUT,
311            router_min_responses: defaults::ROUTER_MIN_RESPONSES,
312            encoder: None,
313            pools: Vec::new(),
314        }
315    }
316
317    pub fn chain(&self) -> Chain {
318        self.chain
319    }
320
321    /// Sets the Tycho API key.
322    pub fn tycho_api_key(mut self, key: impl Into<String>) -> Self {
323        self.tycho_api_key = Some(key.into());
324        self
325    }
326
327    /// Overrides the minimum TVL filter set in [`FyndBuilder::new`].
328    pub fn min_tvl(mut self, min_tvl: f64) -> Self {
329        self.min_tvl = min_tvl;
330        self
331    }
332
333    /// Enables or disables TLS for the Tycho WebSocket connection (default: `true`).
334    pub fn tycho_use_tls(mut self, use_tls: bool) -> Self {
335        self.tycho_use_tls = use_tls;
336        self
337    }
338
339    /// Sets the minimum token quality score; tokens below this threshold are excluded (default:
340    /// 100).
341    pub fn min_token_quality(mut self, quality: i32) -> Self {
342        self.min_token_quality = quality;
343        self
344    }
345
346    /// Filters out pools whose last trade is older than `days` days (default: 3).
347    pub fn traded_n_days_ago(mut self, days: u64) -> Self {
348        self.traded_n_days_ago = days;
349        self
350    }
351
352    /// Multiplies reported TVL by `ratio` before applying the `min_tvl` filter (default: 1.1).
353    pub fn tvl_buffer_ratio(mut self, ratio: f64) -> Self {
354        self.tvl_buffer_ratio = ratio;
355        self
356    }
357
358    /// Sets how often the gas price is refreshed from the RPC node (default: 30 s).
359    pub fn gas_refresh_interval(mut self, interval: Duration) -> Self {
360        self.gas_refresh_interval = interval;
361        self
362    }
363
364    /// Sets the delay before reconnecting to Tycho after a disconnection (default: 5 s).
365    pub fn reconnect_delay(mut self, delay: Duration) -> Self {
366        self.reconnect_delay = delay;
367        self
368    }
369
370    /// Replaces the set of component addresses that are excluded from routing.
371    pub fn blacklisted_components(mut self, components: HashSet<String>) -> Self {
372        self.blacklisted_components = components;
373        self
374    }
375
376    /// Sets the worker router timeout (default: 10s).
377    pub fn worker_router_timeout(mut self, timeout: Duration) -> Self {
378        self.router_timeout = timeout;
379        self
380    }
381
382    /// Sets the minimum number of solver responses before early return (default: 0).
383    pub fn worker_router_min_responses(mut self, min: usize) -> Self {
384        self.router_min_responses = min;
385        self
386    }
387
388    /// Overrides the default encoder.
389    pub fn encoder(mut self, encoder: Encoder) -> Self {
390        self.encoder = Some(encoder);
391        self
392    }
393
394    /// Shorthand: adds a single pool named `"default"` using a built-in algorithm by name.
395    pub fn algorithm(mut self, algorithm: impl Into<String>) -> Self {
396        self.pools.push(PoolEntry::BuiltIn {
397            name: "default".to_string(),
398            algorithm: algorithm.into(),
399            num_workers: num_cpus::get(),
400            task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
401            min_hops: defaults::POOL_MIN_HOPS,
402            max_hops: defaults::POOL_MAX_HOPS,
403            timeout_ms: defaults::POOL_TIMEOUT_MS,
404            max_routes: None,
405        });
406        self
407    }
408
409    /// Shorthand: adds a single pool with a custom [`Algorithm`] implementation.
410    ///
411    /// The `factory` closure is called once per worker thread.
412    pub fn with_algorithm<A, F>(mut self, name: impl Into<String>, factory: F) -> Self
413    where
414        A: Algorithm + 'static,
415        A::GraphManager: MarketEventHandler + EdgeWeightUpdaterWithDerived + 'static,
416        F: Fn(AlgorithmConfig) -> A + Clone + Send + Sync + 'static,
417    {
418        let name = name.into();
419        let algo_name = name.clone();
420        let configure =
421            Box::new(move |builder: WorkerPoolBuilder| builder.with_algorithm(algo_name, factory));
422        self.pools
423            .push(PoolEntry::Custom(CustomPoolEntry {
424                name,
425                num_workers: num_cpus::get(),
426                task_queue_capacity: defaults::POOL_TASK_QUEUE_CAPACITY,
427                min_hops: defaults::POOL_MIN_HOPS,
428                max_hops: defaults::POOL_MAX_HOPS,
429                timeout_ms: defaults::POOL_TIMEOUT_MS,
430                max_routes: None,
431                configure,
432            }));
433        self
434    }
435
436    /// Adds a named pool using the given [`PoolConfig`].
437    pub fn add_pool(mut self, name: impl Into<String>, config: &PoolConfig) -> Self {
438        self.pools.push(PoolEntry::BuiltIn {
439            name: name.into(),
440            algorithm: config.algorithm().to_string(),
441            num_workers: config.num_workers(),
442            task_queue_capacity: config.task_queue_capacity(),
443            min_hops: config.min_hops(),
444            max_hops: config.max_hops(),
445            timeout_ms: config.timeout_ms(),
446            max_routes: config.max_routes(),
447        });
448        self
449    }
450
451    /// Assembles and starts all solver components.
452    ///
453    /// # Errors
454    ///
455    /// Returns [`SolverBuildError`] if any component fails to initialize.
456    pub fn build(self) -> Result<Solver, SolverBuildError> {
457        if self.pools.is_empty() {
458            return Err(SolverBuildError::NoPools);
459        }
460
461        let market_data = Arc::new(tokio::sync::RwLock::new(SharedMarketData::new()));
462
463        let tycho_feed_config = TychoFeedConfig::new(
464            self.tycho_url,
465            self.chain,
466            self.tycho_api_key,
467            self.tycho_use_tls,
468            self.protocols,
469            self.min_tvl,
470        )
471        .tvl_buffer_ratio(self.tvl_buffer_ratio)
472        .gas_refresh_interval(self.gas_refresh_interval)
473        .reconnect_delay(self.reconnect_delay)
474        .min_token_quality(self.min_token_quality)
475        .traded_n_days_ago(self.traded_n_days_ago)
476        .blacklisted_components(self.blacklisted_components);
477
478        let ethereum_client = EthereumRpcClient::new(self.rpc_url.as_str())
479            .map_err(|e| SolverBuildError::RpcClient(e.to_string()))?;
480
481        let (mut gas_price_fetcher, gas_price_worker_signal_tx) =
482            GasPriceFetcher::new(ethereum_client, Arc::clone(&market_data));
483
484        let mut tycho_feed = TychoFeed::new(tycho_feed_config, Arc::clone(&market_data));
485        tycho_feed = tycho_feed.with_gas_price_worker_signal_tx(gas_price_worker_signal_tx);
486
487        let gas_token = native_token(&self.chain).map_err(|_| SolverBuildError::GasToken)?;
488        let computation_config = ComputationManagerConfig::new()
489            .with_gas_token(gas_token)
490            .with_depth_slippage_threshold(DEFAULT_DEPTH_SLIPPAGE_THRESHOLD);
491        // ComputationManager::new returns a broadcast receiver that we don't need here —
492        // workers subscribe via computation_manager.event_sender() below.
493        let (computation_manager, _) =
494            ComputationManager::new(computation_config, Arc::clone(&market_data))
495                .map_err(|e| SolverBuildError::ComputationManager(e.to_string()))?;
496
497        let derived_data: SharedDerivedDataRef = computation_manager.store();
498        let derived_event_tx = computation_manager.event_sender();
499
500        // Subscribe event channels before spawning (one for computation manager + one per pool)
501        let computation_event_rx = tycho_feed.subscribe();
502        let (computation_shutdown_tx, computation_shutdown_rx) = broadcast::channel(1);
503
504        let mut solver_pool_handles: Vec<SolverPoolHandle> = Vec::new();
505        let mut worker_pools: Vec<WorkerPool> = Vec::new();
506
507        for pool_entry in self.pools {
508            let pool_event_rx = tycho_feed.subscribe();
509            let derived_rx = derived_event_tx.subscribe();
510
511            let (worker_pool, task_handle) = match pool_entry {
512                PoolEntry::BuiltIn {
513                    name,
514                    algorithm,
515                    num_workers,
516                    task_queue_capacity,
517                    min_hops,
518                    max_hops,
519                    timeout_ms,
520                    max_routes,
521                } => {
522                    let algo_cfg = AlgorithmConfig::new(
523                        min_hops,
524                        max_hops,
525                        Duration::from_millis(timeout_ms),
526                        max_routes,
527                    )?;
528                    WorkerPoolBuilder::new()
529                        .name(name)
530                        .algorithm(algorithm)
531                        .algorithm_config(algo_cfg)
532                        .num_workers(num_workers)
533                        .task_queue_capacity(task_queue_capacity)
534                        .build(
535                            Arc::clone(&market_data),
536                            Arc::clone(&derived_data),
537                            pool_event_rx,
538                            derived_rx,
539                        )?
540                }
541                PoolEntry::Custom(custom) => {
542                    let algo_cfg = AlgorithmConfig::new(
543                        custom.min_hops,
544                        custom.max_hops,
545                        Duration::from_millis(custom.timeout_ms),
546                        custom.max_routes,
547                    )?;
548                    let builder = WorkerPoolBuilder::new()
549                        .name(custom.name)
550                        .algorithm_config(algo_cfg)
551                        .num_workers(custom.num_workers)
552                        .task_queue_capacity(custom.task_queue_capacity);
553                    let builder = (custom.configure)(builder);
554                    builder.build(
555                        Arc::clone(&market_data),
556                        Arc::clone(&derived_data),
557                        pool_event_rx,
558                        derived_rx,
559                    )?
560                }
561            };
562
563            solver_pool_handles.push(SolverPoolHandle::new(worker_pool.name(), task_handle));
564            worker_pools.push(worker_pool);
565        }
566
567        let encoder = match self.encoder {
568            Some(enc) => enc,
569            None => {
570                let registry = SwapEncoderRegistry::new(self.chain)
571                    .add_default_encoders(None)
572                    .map_err(|e| SolverBuildError::Encoder(e.to_string()))?;
573                Encoder::new(self.chain, registry)
574                    .map_err(|e| SolverBuildError::Encoder(e.to_string()))?
575            }
576        };
577
578        let chain = self.chain;
579        let router_address = encoder.router_address().clone();
580
581        let router_config = WorkerPoolRouterConfig::default()
582            .with_timeout(self.router_timeout)
583            .with_min_responses(self.router_min_responses);
584        let router = WorkerPoolRouter::new(solver_pool_handles, router_config, encoder);
585
586        let feed_handle = tokio::spawn(async move {
587            if let Err(e) = tycho_feed.run().await {
588                tracing::error!(error = %e, "tycho feed error");
589            }
590        });
591
592        let gas_price_handle = tokio::spawn(async move {
593            if let Err(e) = gas_price_fetcher.run().await {
594                tracing::error!(error = %e, "gas price fetcher error");
595            }
596        });
597
598        let computation_handle = tokio::spawn(async move {
599            computation_manager
600                .run(computation_event_rx, computation_shutdown_rx)
601                .await;
602        });
603
604        Ok(Solver {
605            router,
606            worker_pools,
607            market_data,
608            derived_data,
609            feed_handle,
610            gas_price_handle,
611            computation_handle,
612            computation_shutdown_tx,
613            chain,
614            router_address,
615        })
616    }
617}
618
619/// A running solver assembled by [`FyndBuilder`].
620pub struct Solver {
621    router: WorkerPoolRouter,
622    worker_pools: Vec<WorkerPool>,
623    market_data: SharedMarketDataRef,
624    derived_data: SharedDerivedDataRef,
625    feed_handle: JoinHandle<()>,
626    gas_price_handle: JoinHandle<()>,
627    computation_handle: JoinHandle<()>,
628    computation_shutdown_tx: broadcast::Sender<()>,
629    chain: Chain,
630    router_address: Bytes,
631}
632
633impl Solver {
634    /// Returns a clone of the shared market data reference.
635    pub fn market_data(&self) -> SharedMarketDataRef {
636        Arc::clone(&self.market_data)
637    }
638
639    /// Returns a clone of the shared derived data reference.
640    pub fn derived_data(&self) -> SharedDerivedDataRef {
641        Arc::clone(&self.derived_data)
642    }
643
644    /// Submits a [`QuoteRequest`] to the worker pools and returns the best [`Quote`].
645    ///
646    /// # Errors
647    ///
648    /// Returns [`SolveError`] if all pools fail or the router timeout elapses.
649    pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
650        self.router.quote(request).await
651    }
652
653    /// Waits until the solver is ready to answer quotes.
654    ///
655    /// Ready means:
656    /// - The Tycho feed has delivered at least one market snapshot.
657    /// - The computation manager has completed at least one derived-data cycle (spot prices, pool
658    ///   depths, token gas prices).
659    ///
660    /// The method polls every 500 ms and returns as soon as both conditions are
661    /// met, or returns [`WaitReadyError`] if `timeout` elapses first.
662    ///
663    /// # Example
664    ///
665    /// ```ignore
666    /// solver.wait_until_ready(Duration::from_secs(180)).await?;
667    /// ```
668    pub async fn wait_until_ready(&self, timeout: Duration) -> Result<(), WaitReadyError> {
669        const POLL_INTERVAL: Duration = Duration::from_millis(500);
670
671        let deadline = tokio::time::Instant::now() + timeout;
672
673        loop {
674            let market_ready = self
675                .market_data
676                .read()
677                .await
678                .last_updated()
679                .is_some();
680            let derived_ready = self
681                .derived_data
682                .read()
683                .await
684                .derived_data_ready();
685
686            if market_ready && derived_ready {
687                return Ok(());
688            }
689
690            if tokio::time::Instant::now() >= deadline {
691                return Err(WaitReadyError { timeout_ms: timeout.as_millis() as u64 });
692            }
693
694            tokio::time::sleep(POLL_INTERVAL).await;
695        }
696    }
697
698    /// Signals all worker pools and the computation manager to stop, then aborts background tasks.
699    pub fn shutdown(self) {
700        let _ = self.computation_shutdown_tx.send(());
701        for pool in self.worker_pools {
702            pool.shutdown();
703        }
704        self.feed_handle.abort();
705        self.gas_price_handle.abort();
706    }
707
708    /// Consumes the solver into its raw parts for callers that add their own layer.
709    pub fn into_parts(self) -> SolverParts {
710        SolverParts {
711            router: self.router,
712            worker_pools: self.worker_pools,
713            market_data: self.market_data,
714            derived_data: self.derived_data,
715            feed_handle: self.feed_handle,
716            gas_price_handle: self.gas_price_handle,
717            computation_handle: self.computation_handle,
718            computation_shutdown_tx: self.computation_shutdown_tx,
719            chain: self.chain,
720            router_address: self.router_address,
721        }
722    }
723}
724
725/// Raw components of a [`Solver`], for callers adding their own layer (e.g., an HTTP server).
726///
727/// Obtained via [`Solver::into_parts`].
728pub struct SolverParts {
729    /// Routes quote requests across worker pools.
730    router: WorkerPoolRouter,
731    /// One [`WorkerPool`] per configured algorithm pool.
732    worker_pools: Vec<WorkerPool>,
733    /// Live market snapshot shared across all components.
734    market_data: SharedMarketDataRef,
735    /// Derived on-chain data (spot prices, depths, gas costs) shared across all components.
736    derived_data: SharedDerivedDataRef,
737    /// Background task running the Tycho market-data feed.
738    feed_handle: JoinHandle<()>,
739    /// Background task polling the RPC node for gas prices.
740    gas_price_handle: JoinHandle<()>,
741    /// Background task running the computation manager.
742    computation_handle: JoinHandle<()>,
743    /// Send a unit value on this channel to trigger a graceful computation-manager shutdown.
744    computation_shutdown_tx: broadcast::Sender<()>,
745    /// Chain this solver is configured for.
746    chain: Chain,
747    /// Address of the Tycho Router contract on this chain.
748    router_address: Bytes,
749}
750
751impl SolverParts {
752    /// Returns the chain this solver is configured for.
753    pub fn chain(&self) -> Chain {
754        self.chain
755    }
756
757    /// Returns the Tycho Router contract address for this chain.
758    pub fn router_address(&self) -> &Bytes {
759        &self.router_address
760    }
761
762    /// Returns a reference to the worker pools.
763    pub fn worker_pools(&self) -> &[WorkerPool] {
764        &self.worker_pools
765    }
766
767    /// Returns a reference to the shared market data.
768    pub fn market_data(&self) -> &SharedMarketDataRef {
769        &self.market_data
770    }
771
772    /// Returns a reference to the shared derived data.
773    pub fn derived_data(&self) -> &SharedDerivedDataRef {
774        &self.derived_data
775    }
776
777    /// Consumes the parts and returns the router.
778    pub fn into_router(self) -> WorkerPoolRouter {
779        self.router
780    }
781
782    /// Consumes the parts, returning all owned components.
783    #[allow(clippy::type_complexity)]
784    pub fn into_components(
785        self,
786    ) -> (
787        WorkerPoolRouter,
788        Vec<WorkerPool>,
789        SharedMarketDataRef,
790        SharedDerivedDataRef,
791        JoinHandle<()>,
792        JoinHandle<()>,
793        JoinHandle<()>,
794        broadcast::Sender<()>,
795    ) {
796        (
797            self.router,
798            self.worker_pools,
799            self.market_data,
800            self.derived_data,
801            self.feed_handle,
802            self.gas_price_handle,
803            self.computation_handle,
804            self.computation_shutdown_tx,
805        )
806    }
807}