Skip to main content

fynd_core/worker_pool_router/
mod.rs

1//! Orchestrates multiple solver pools to find the best quote per request.
2//!
3//! The WorkerPoolRouter sits between the API layer and multiple solver pools.
4//! It fans out each order to all configured solvers, manages timeouts,
5//! selects the best quote based on `amount_out_net_gas`, and optionally
6//! encodes the winning solution into an on-chain transaction.
7
8//! # Responsibilities
9//!
10//! 1. **Fan-out**: Distribute each order to solver pools. Its distribution algorithm can be
11//!    customized, but initially it's set to relay to all solvers.
12//! 2. **Timeout**: Cancel if solver response takes too long
13//! 3. **Collection**: Wait for N responses OR timeout per order
14//! 4. **Gas refinement**: Before cross-pool ranking, replace each candidate's naive
15//!    `route.total_gas()` estimate (used internally by algorithms for intra-pool ranking) with the
16//!    more accurate `estimate_gas_usage` from tycho-execution, which accounts for token transfer
17//!    costs and router overhead. The `amount_out_net_gas` values are rescaled proportionally so the
18//!    final ranking reflects realistic execution cost.
19//! 5. **Selection**: Choose best quote (max refined `amount_out_net_gas`)
20//! 6. **Encoding**: If [`EncodingOptions`](crate::EncodingOptions) are provided in the request,
21//!    encode winning solutions into executable on-chain transactions via the
22//!    [`encoding::encoder::Encoder`](crate::encoding::encoder::Encoder)
23
24pub mod config;
25
26use std::{
27    collections::HashSet,
28    time::{Duration, Instant},
29};
30
31use config::WorkerPoolRouterConfig;
32use futures::stream::{FuturesUnordered, StreamExt};
33use metrics::{counter, histogram};
34use num_bigint::BigUint;
35use tracing::{debug, warn};
36use tycho_execution::encoding::{
37    evm::gas_estimator::estimate_gas_usage,
38    models::{Solution, Strategy},
39};
40use tycho_simulation::tycho_common::Bytes;
41
42use crate::{
43    encoding::encoder::Encoder, price_guard::guard::PriceGuard,
44    worker_pool::task_queue::TaskQueueHandle, BlockInfo, EncodingOptions, Order, OrderQuote, Quote,
45    QuoteOptions, QuoteRequest, QuoteStatus, SolveError, SolveParams,
46};
47
48/// Handle to a solver pool for dispatching orders.
49#[derive(Clone)]
50pub struct SolverPoolHandle {
51    /// Human-readable name for this pool (used in logging & metrics).
52    name: String,
53    /// Queue handle for this pool.
54    queue: TaskQueueHandle,
55}
56
57impl SolverPoolHandle {
58    /// Creates a new solver pool handle.
59    pub fn new(name: impl Into<String>, queue: TaskQueueHandle) -> Self {
60        Self { name: name.into(), queue }
61    }
62
63    /// Returns the pool name.
64    pub fn name(&self) -> &str {
65        &self.name
66    }
67
68    /// Returns the task queue handle.
69    pub fn queue(&self) -> &TaskQueueHandle {
70        &self.queue
71    }
72}
73
74/// Collected responses for a single order from multiple solvers.
75#[derive(Debug)]
76pub(crate) struct OrderResponses {
77    /// ID of the order these responses correspond to.
78    order_id: String,
79    /// Quotes received from each solver pool (pool_name, quote).
80    quotes: Vec<(String, OrderQuote)>,
81    /// Solver pools that failed with their respective errors (pool_name, error).
82    /// This captures all error types: timeouts, no routes, algorithm errors, etc.
83    failed_solvers: Vec<(String, SolveError)>,
84}
85
86/// Orchestrates multiple solver pools to find the best quote.
87pub struct WorkerPoolRouter {
88    /// All registered solver pools.
89    solver_pools: Vec<SolverPoolHandle>,
90    /// Configuration for the worker router.
91    config: WorkerPoolRouterConfig,
92    /// Encoder for encoding solutions into on-chain transactions.
93    encoder: Encoder,
94    /// Validates solution outputs against external price sources.
95    /// Present when the server has price guard enabled; `None` when disabled.
96    price_guard: Option<PriceGuard>,
97}
98
99impl WorkerPoolRouter {
100    /// Creates a new WorkerPoolRouter with the given solver pools, config, and encoder.
101    pub fn new(
102        solver_pools: Vec<SolverPoolHandle>,
103        config: WorkerPoolRouterConfig,
104        encoder: Encoder,
105    ) -> Self {
106        Self { solver_pools, config, encoder, price_guard: None }
107    }
108
109    /// Makes price guard validation available for this router.
110    ///
111    /// Providers are started and caches stay warm. Validation only runs for
112    /// requests where the client sets `enabled: true` in `PriceGuardConfig`.
113    pub fn with_price_guard(mut self, price_guard: PriceGuard) -> Self {
114        self.price_guard = Some(price_guard);
115        self
116    }
117
118    /// Returns the number of registered solver pools.
119    pub fn num_pools(&self) -> usize {
120        self.solver_pools.len()
121    }
122
123    /// Returns a quote by fanning out to all solver pools.
124    ///
125    /// For each order in the request:
126    /// 1. Sends the order to all solver pools in parallel
127    /// 2. Waits for responses with timeout
128    /// 3. Selects the best quote based on `amount_out_net_gas`
129    /// 4. If `encoding_options` are set on the request, encodes winning solutions into on-chain
130    ///    transactions
131    pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
132        let start = Instant::now();
133        let deadline = start + self.effective_timeout(request.options());
134        let min_responses = request
135            .options()
136            .min_responses()
137            .unwrap_or(self.config.min_responses());
138
139        if self.solver_pools.is_empty() {
140            return Err(SolveError::Internal("no solver pools configured".to_string()));
141        }
142
143        let params = match request.options().state_label().cloned() {
144            Some(label) => SolveParams::default().with_state_label(label),
145            None => SolveParams::default(),
146        };
147
148        // Process each order independently in parallel
149        let order_futures: Vec<_> = request
150            .orders()
151            .iter()
152            .map(|order| self.solve_order(order.clone(), params.clone(), deadline, min_responses))
153            .collect();
154
155        let mut order_responses = futures::future::join_all(order_futures).await;
156
157        // Refine gas estimates for all candidates using estimate_gas_usage before ranking,
158        // so ranking uses accurate gas costs rather than naive route.total_gas().
159        if let Some(encoding_options) = request.options().encoding_options() {
160            refine_gas_estimates(&mut order_responses, encoding_options)?;
161        }
162
163        // Rank quotes for each order (sorted by refined amount_out_net_gas descending)
164        let ranked_quotes: Vec<Vec<OrderQuote>> = order_responses
165            .into_iter()
166            .map(|responses| self.rank_quotes(&responses, request.options()))
167            .collect();
168
169        // Validate against external prices when the client explicitly enables it.
170        let price_guard_config = request
171            .options()
172            .encoding_options()
173            .map(|e| e.price_guard())
174            .filter(|c| c.enabled());
175
176        let mut order_quotes: Vec<OrderQuote> = match (&self.price_guard, price_guard_config) {
177            (Some(guard), Some(config)) => guard
178                .validate(ranked_quotes, config)
179                .map_err(|e| {
180                    warn!(error = %e, "price guard validation error");
181                    SolveError::Internal(e.to_string())
182                })?,
183            (None, Some(_)) => {
184                return Err(SolveError::Internal(
185                    "price guard config provided but price guard is not enabled on this server"
186                        .to_string(),
187                ));
188            }
189            _ => ranked_quotes
190                .into_iter()
191                .filter_map(|candidates| candidates.into_iter().next())
192                .collect(),
193        };
194
195        // Encode solutions if encoding_options is set
196        if let Some(encoding_options) = request.options().encoding_options() {
197            order_quotes = self
198                .encoder
199                .encode(order_quotes, encoding_options.clone())
200                .await?;
201        }
202
203        // Calculate totals
204        let total_gas_estimate = order_quotes
205            .iter()
206            .map(|o| o.gas_estimate())
207            .fold(BigUint::ZERO, |acc, g| acc + g);
208
209        let solve_time_ms = start.elapsed().as_millis() as u64;
210
211        Ok(Quote::new(order_quotes, total_gas_estimate, solve_time_ms))
212    }
213
214    /// Solves a single order by fanning out to all solver pools.
215    async fn solve_order(
216        &self,
217        order: Order,
218        params: SolveParams,
219        deadline: Instant,
220        min_responses: usize,
221    ) -> OrderResponses {
222        let start_time = Instant::now();
223        let order_id = order.id().to_string();
224
225        // Fan-out: send order to all solver pools
226        // perf: In the future, we can add new distribution algorithms, like sending short-timeout
227        // only to fast workers.
228        let mut pending: FuturesUnordered<_> = self
229            .solver_pools
230            .iter()
231            .map(|pool| {
232                let order_clone = order.clone();
233                let pool_name = pool.name().to_string();
234                let queue = pool.queue().clone();
235                let task_params = params.clone();
236
237                async move {
238                    let result = queue
239                        .enqueue(order_clone, task_params)
240                        .await;
241                    (pool_name, result)
242                }
243            })
244            .collect();
245
246        let mut quotes = Vec::new();
247        let mut failed_solvers: Vec<(String, SolveError)> = Vec::new();
248        let mut remaining_pools: HashSet<String> = self
249            .solver_pools
250            .iter()
251            .map(|p| p.name().to_string())
252            .collect();
253
254        // Collect responses with timeout
255        loop {
256            let deadline_instant = tokio::time::Instant::from_std(deadline);
257
258            tokio::select! {
259                // Always checks timeout first, ensuring we respect the deadline
260                biased;
261
262                // Timeout reached
263                _ = tokio::time::sleep_until(deadline_instant) => {
264                    // Mark all remaining pools as timed out
265                    let elapsed_ms = deadline.saturating_duration_since(Instant::now())
266                        .as_millis() as u64;
267                    for pool_name in remaining_pools.drain() {
268                        failed_solvers.push((
269                            pool_name,
270                            SolveError::Timeout { elapsed_ms },
271                        ));
272                    }
273                    break;
274                }
275
276                // Response received
277                result = pending.next() => {
278                    match result {
279                        Some((pool_name, Ok(single_quote))) => {
280                            // Remove from remaining
281                            remaining_pools.remove(&pool_name);
282
283                            // Extract the OrderQuote from SingleOrderQuote
284                            quotes.push((pool_name.clone(), single_quote.order().clone()));
285
286                            // Early return if min_responses reached
287                            if min_responses > 0 && quotes.len() >= min_responses {
288                                debug!(
289                                    order_id = %order_id,
290                                    responses = quotes.len(),
291                                    min_responses,
292                                    "early return: min_responses reached"
293                                );
294                                counter!("worker_router_early_returns_total").increment(1);
295                                break;
296                            }
297                        }
298                        Some((pool_name, Err(e))) => {
299                            remaining_pools.remove(&pool_name);
300                            warn!(
301                                pool = %pool_name,
302                                order_id = %order_id,
303                                error = %e,
304                                "solver pool failed"
305                            );
306                            failed_solvers.push((pool_name, e));
307                        }
308                        None => {
309                            // All futures completed
310                            break;
311                        }
312                    }
313                }
314            }
315        }
316
317        // Record metrics
318        let duration = start_time.elapsed().as_secs_f64();
319        histogram!("worker_router_solve_duration_seconds").record(duration);
320        histogram!("worker_router_solver_responses").record(quotes.len() as f64);
321
322        // Record failures by pool and error type
323        for (pool_name, error) in &failed_solvers {
324            let error_type = match error {
325                SolveError::Timeout { .. } => "timeout",
326                SolveError::NoRouteFound { .. } => "no_route",
327                SolveError::QueueFull => "queue_full",
328                SolveError::Internal(_) => "internal",
329                SolveError::PriceCheckFailed { .. } => "price_check_failed",
330                _ => "other",
331            };
332            counter!("worker_router_solver_failures_total", "pool" => pool_name.clone(), "error_type" => error_type).increment(1);
333        }
334
335        if !failed_solvers.is_empty() {
336            let timeout_count = failed_solvers
337                .iter()
338                .filter(|(_, e)| matches!(e, SolveError::Timeout { .. }))
339                .count();
340            let other_count = failed_solvers.len() - timeout_count;
341            warn!(
342                order_id = %order_id,
343                timeout_count,
344                other_failures = other_count,
345                "some solver pools failed"
346            );
347        }
348
349        OrderResponses { order_id, quotes, failed_solvers }
350    }
351
352    /// Returns all valid quotes for an order, ranked by `amount_out_net_gas` descending.
353    ///
354    /// If no valid quotes exist, returns a single-element vec with a placeholder
355    /// (`NoRouteFound` or `Timeout`) so that downstream always has at least one
356    /// candidate per order.
357    fn rank_quotes(&self, responses: &OrderResponses, options: &QuoteOptions) -> Vec<OrderQuote> {
358        let mut valid_quotes: Vec<_> = responses
359            .quotes
360            .iter()
361            .filter(|(_, q)| q.status() == QuoteStatus::Success)
362            .filter(|(_, q)| {
363                options
364                    .max_gas()
365                    .map(|max| q.gas_estimate() <= max)
366                    .unwrap_or(true)
367            })
368            .collect();
369
370        // Sort descending by amount_out_net_gas
371        valid_quotes.sort_by(|(_, a), (_, b)| {
372            b.amount_out_net_gas()
373                .cmp(a.amount_out_net_gas())
374        });
375
376        if !valid_quotes.is_empty() {
377            counter!("worker_router_orders_total", "status" => "success").increment(1);
378            let (pool_name, best) = valid_quotes[0];
379            counter!("worker_router_best_quote_pool", "pool" => pool_name.clone()).increment(1);
380            debug!(
381                order_id = %best.order_id(),
382                number_of_candidates = valid_quotes.len(),
383                "ranked quotes"
384            );
385            return valid_quotes
386                .into_iter()
387                .map(|(_, q)| q.clone())
388                .collect();
389        }
390
391        // No valid quote found - return a NoRouteFound response
392        // Try to get any response to extract block info, or create a placeholder
393        let fallback = if let Some((_, any_q)) = responses.quotes.first() {
394            counter!("worker_router_orders_total", "status" => "no_route").increment(1);
395            OrderQuote::new(
396                responses.order_id.clone(),
397                QuoteStatus::NoRouteFound,
398                any_q.amount_in().clone(),
399                BigUint::ZERO,
400                BigUint::ZERO,
401                BigUint::ZERO,
402                any_q.block().clone(),
403                String::new(),
404                any_q.sender().clone(),
405                any_q.receiver().clone(),
406                any_q.solved_against().clone(),
407            )
408        } else {
409            // No responses at all - determine status from failure types
410            let status = if responses.failed_solvers.is_empty() {
411                QuoteStatus::NoRouteFound
412            } else {
413                // If all failures are timeouts, report as Timeout
414                // Otherwise report as NoRouteFound (more general failure)
415                let all_timeouts = responses
416                    .failed_solvers
417                    .iter()
418                    .all(|(_, e)| matches!(e, SolveError::Timeout { .. }));
419                let all_not_ready = responses
420                    .failed_solvers
421                    .iter()
422                    .all(|(_, e)| matches!(e, SolveError::NotReady(_)));
423                if all_timeouts {
424                    QuoteStatus::Timeout
425                } else if all_not_ready {
426                    QuoteStatus::NotReady
427                } else {
428                    QuoteStatus::NoRouteFound
429                }
430            };
431
432            // Record status metric
433            let status_label = match status {
434                QuoteStatus::Timeout => "timeout",
435                QuoteStatus::NotReady => "not_ready",
436                _ => "no_route",
437            };
438            counter!("worker_router_orders_total", "status" => status_label).increment(1);
439
440            // No worker responded — use the requested label if set, otherwise "0"
441            // (we have no block context here since no worker completed).
442            let label = options
443                .state_label()
444                .cloned()
445                .unwrap_or_else(|| "0".to_string());
446            OrderQuote::new(
447                responses.order_id.clone(),
448                status,
449                BigUint::ZERO,
450                BigUint::ZERO,
451                BigUint::ZERO,
452                BigUint::ZERO,
453                BlockInfo::new(0, String::new(), 0),
454                String::new(),
455                Bytes::default(),
456                Bytes::default(),
457                label,
458            )
459        };
460        vec![fallback]
461    }
462
463    /// Returns the effective timeout for a request.
464    fn effective_timeout(&self, options: &QuoteOptions) -> Duration {
465        options
466            .timeout_ms()
467            .map(Duration::from_millis)
468            .unwrap_or(self.config.default_timeout())
469    }
470}
471
472fn refine_gas_estimates(
473    order_responses: &mut Vec<OrderResponses>,
474    encoding_options: &EncodingOptions,
475) -> Result<(), SolveError> {
476    for responses in order_responses {
477        for (_, quote) in &mut responses.quotes {
478            if quote.status() != QuoteStatus::Success {
479                continue;
480            }
481            let solution = Solution::try_from(&*quote)?
482                .with_user_transfer_type(encoding_options.transfer_type().clone());
483            let refined_gas = estimate_gas_usage(&solution, derive_strategy(quote));
484            let naive_gas = quote.gas_estimate().clone();
485            if naive_gas > BigUint::ZERO {
486                let gas_cost_in_token_out = quote.amount_out() - quote.amount_out_net_gas();
487                let new_gas_cost = &gas_cost_in_token_out * &refined_gas / &naive_gas;
488                let new_net = if new_gas_cost <= *quote.amount_out() {
489                    quote.amount_out() - &new_gas_cost
490                } else {
491                    BigUint::ZERO
492                };
493                quote.set_amount_out_net_gas(new_net);
494                quote.set_gas_estimate(refined_gas);
495            }
496        }
497    }
498    Ok(())
499}
500
501fn derive_strategy(quote: &OrderQuote) -> Strategy {
502    let Some(route) = quote.route() else { return Strategy::Single };
503    let swaps = route.swaps();
504    if swaps.len() == 1 {
505        Strategy::Single
506    } else if swaps.iter().any(|s| *s.split() > 0.0) {
507        Strategy::Split
508    } else {
509        Strategy::Sequential
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use std::collections::HashMap;
516
517    use rstest::rstest;
518    use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
519    use tycho_simulation::{
520        tycho_common::models::Chain,
521        tycho_core::{
522            models::{token::Token, Address, Chain as SimChain},
523            Bytes,
524        },
525    };
526
527    use super::*;
528    use crate::{
529        algorithm::test_utils::{component, MockProtocolSim},
530        types::internal::SolveTask,
531        EncodingOptions, OrderSide, Route, SingleOrderQuote, Swap,
532    };
533
534    fn default_encoder() -> Encoder {
535        let registry = SwapEncoderRegistry::new(Chain::Ethereum)
536            .add_default_encoders(None)
537            .expect("default encoders should always succeed");
538        Encoder::new(Chain::Ethereum, registry).expect("encoder creation should succeed")
539    }
540
541    fn make_address(byte: u8) -> Address {
542        Address::from([byte; 20])
543    }
544
545    fn make_order() -> Order {
546        Order::new(
547            make_address(0x01),
548            make_address(0x02),
549            BigUint::from(1000u64),
550            OrderSide::Sell,
551            make_address(0xAA),
552        )
553        .with_id("test-order".to_string())
554    }
555
556    fn make_single_quote(amount_out_net_gas: u64) -> SingleOrderQuote {
557        let make_token = |addr: Address| Token {
558            address: addr,
559            symbol: "T".to_string(),
560            decimals: 18,
561            tax: Default::default(),
562            gas: vec![],
563            chain: SimChain::Ethereum,
564            quality: 100,
565        };
566        let tin = make_address(0x01);
567        let tout = make_address(0x02);
568        let tin_token = make_token(tin.clone());
569        let tout_token = make_token(tout.clone());
570        let swap = Swap::new(
571            "pool-1".to_string(),
572            "uniswap_v2".to_string(),
573            tin.clone(),
574            tout.clone(),
575            BigUint::from(1000u64),
576            BigUint::from(990u64),
577            BigUint::from(50_000u64),
578            component(
579                "0x0000000000000000000000000000000000000001",
580                &[tin_token.clone(), tout_token.clone()],
581            ),
582            Box::new(MockProtocolSim::default()),
583        );
584        let mut tokens = HashMap::new();
585        tokens.insert(tin, tin_token);
586        tokens.insert(tout, tout_token);
587        let quote = OrderQuote::new(
588            "test-order".to_string(),
589            QuoteStatus::Success,
590            BigUint::from(1000u64),
591            BigUint::from(990u64),
592            BigUint::from(100_000u64),
593            BigUint::from(amount_out_net_gas),
594            BlockInfo::new(1, "0x123".to_string(), 1000),
595            "test".to_string(),
596            Bytes::from(make_address(0xAA).as_ref()),
597            Bytes::from(make_address(0xAA).as_ref()),
598            "1".to_string(),
599        )
600        .with_route(Route::new(vec![swap], tokens));
601        SingleOrderQuote::new(quote, 5)
602    }
603
604    // Helper to create a mock solver pool that responds with a given solution
605    fn create_mock_pool(
606        name: &str,
607        response: Result<SingleOrderQuote, SolveError>,
608        delay_ms: u64,
609    ) -> (SolverPoolHandle, tokio::task::JoinHandle<()>) {
610        let (tx, rx) = async_channel::bounded::<SolveTask>(10);
611        let handle = TaskQueueHandle::from_sender(tx);
612
613        let worker = tokio::spawn(async move {
614            while let Ok(task) = rx.recv().await {
615                if delay_ms > 0 {
616                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
617                }
618                task.respond(response.clone());
619            }
620        });
621
622        (SolverPoolHandle::new(name, handle), worker)
623    }
624
625    #[test]
626    fn test_config_default() {
627        let config = WorkerPoolRouterConfig::default();
628        assert_eq!(config.default_timeout(), Duration::from_secs(1));
629        assert_eq!(config.min_responses(), 1);
630    }
631
632    #[test]
633    fn test_config_builder() {
634        let config = WorkerPoolRouterConfig::default()
635            .with_timeout(Duration::from_millis(500))
636            .with_min_responses(2);
637        assert_eq!(config.default_timeout(), Duration::from_millis(500));
638        assert_eq!(config.min_responses(), 2);
639    }
640
641    #[tokio::test]
642    async fn test_router_no_pools() {
643        let worker_router =
644            WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
645        let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
646
647        let result = worker_router.quote(request).await;
648        assert!(matches!(result, Err(SolveError::Internal(_))));
649    }
650
651    #[tokio::test]
652    async fn test_router_single_pool_success() {
653        let (pool, worker) = create_mock_pool("pool_a", Ok(make_single_quote(900)), 0);
654
655        let worker_router =
656            WorkerPoolRouter::new(vec![pool], WorkerPoolRouterConfig::default(), default_encoder());
657        let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
658        let request = QuoteRequest::new(vec![make_order()], options);
659
660        let result = worker_router.quote(request).await;
661        assert!(result.is_ok());
662
663        let quote = result.unwrap();
664        assert_eq!(quote.orders().len(), 1);
665        assert_eq!(quote.orders()[0].status(), QuoteStatus::Success);
666        // amount_out_net_gas is refined using estimate_gas_usage before ranking
667        assert_eq!(*quote.orders()[0].amount_out_net_gas(), BigUint::from(873u64));
668        assert!(!quote.orders()[0]
669            .transaction()
670            .unwrap()
671            .data()
672            .is_empty());
673
674        drop(worker_router);
675        worker.abort();
676    }
677
678    #[tokio::test]
679    async fn test_router_selects_best_of_two() {
680        // Pool A: worse quote (net gas = 800)
681        let (pool_a, worker_a) = create_mock_pool("pool_a", Ok(make_single_quote(800)), 0);
682        // Pool B: better quote (net gas = 950)
683        let (pool_b, worker_b) = create_mock_pool("pool_b", Ok(make_single_quote(950)), 0);
684
685        // Wait for both responses to test best selection logic
686        let config = WorkerPoolRouterConfig::default().with_min_responses(2);
687        let worker_router = WorkerPoolRouter::new(vec![pool_a, pool_b], config, default_encoder());
688        let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
689        let request = QuoteRequest::new(vec![make_order()], options);
690
691        let result = worker_router.quote(request).await;
692        assert!(result.is_ok());
693
694        let quote = result.unwrap();
695        assert_eq!(quote.orders().len(), 1);
696        // Pool B wins (higher refined amount_out_net_gas after estimate_gas_usage)
697        assert_eq!(*quote.orders()[0].amount_out_net_gas(), BigUint::from(938u64));
698        assert!(!quote.orders()[0]
699            .transaction()
700            .unwrap()
701            .data()
702            .is_empty());
703
704        drop(worker_router);
705        worker_a.abort();
706        worker_b.abort();
707    }
708
709    #[tokio::test]
710    async fn test_router_timeout() {
711        // Pool that takes too long
712        let (pool, worker) = create_mock_pool("slow_pool", Ok(make_single_quote(900)), 500);
713
714        let config = WorkerPoolRouterConfig::default().with_timeout(Duration::from_millis(50));
715        let worker_router = WorkerPoolRouter::new(vec![pool], config, default_encoder());
716        let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
717
718        let result = worker_router.quote(request).await;
719        assert!(result.is_ok());
720
721        let quote = result.unwrap();
722        // Should timeout and return NoRouteFound or Timeout status
723        assert_eq!(quote.orders().len(), 1);
724        assert!(matches!(
725            quote.orders()[0].status(),
726            QuoteStatus::Timeout | QuoteStatus::NoRouteFound
727        ));
728
729        drop(worker_router);
730        worker.abort();
731    }
732
733    #[tokio::test]
734    async fn test_router_early_return_on_min_responses() {
735        // Pool A: fast
736        let (pool_a, worker_a) = create_mock_pool("fast_pool", Ok(make_single_quote(800)), 0);
737        // Pool B: slow (but we won't wait for it)
738        let (pool_b, worker_b) = create_mock_pool("slow_pool", Ok(make_single_quote(950)), 500);
739
740        let config = WorkerPoolRouterConfig::default()
741            .with_timeout(Duration::from_millis(1000))
742            .with_min_responses(1);
743        let worker_router = WorkerPoolRouter::new(vec![pool_a, pool_b], config, default_encoder());
744
745        let start = Instant::now();
746        let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
747        let request = QuoteRequest::new(vec![make_order()], options);
748
749        let result = worker_router.quote(request).await;
750        let elapsed = start.elapsed();
751
752        assert!(result.is_ok());
753        // Should return quickly (not waiting for pool_b)
754        assert!(elapsed < Duration::from_millis(200));
755
756        // Should have pool_a's quote
757        let quote = result.unwrap();
758        assert_eq!(quote.orders().len(), 1);
759        assert_eq!(quote.orders()[0].status(), QuoteStatus::Success);
760        // Should have encoding
761        assert!(!quote.orders()[0]
762            .transaction()
763            .unwrap()
764            .data()
765            .is_empty());
766
767        drop(worker_router);
768        worker_a.abort();
769        worker_b.abort();
770    }
771
772    #[rstest]
773    #[case::under_limit(100, Some(200), true)]
774    #[case::at_limit(200, Some(200), true)]
775    #[case::over_limit(300, Some(200), false)]
776    #[case::no_limit(500, None, true)]
777    fn test_max_gas_constraint(
778        #[case] gas_estimate: u64,
779        #[case] max_gas: Option<u64>,
780        #[case] should_pass: bool,
781    ) {
782        let responses = OrderResponses {
783            order_id: "test".to_string(),
784            quotes: vec![(
785                "pool".to_string(),
786                OrderQuote::new(
787                    "test".to_string(),
788                    QuoteStatus::Success,
789                    BigUint::from(1000u64),
790                    BigUint::from(990u64),
791                    BigUint::from(gas_estimate),
792                    BigUint::from(900u64),
793                    BlockInfo::new(1, "0x123".to_string(), 1000),
794                    "test".to_string(),
795                    Bytes::from(make_address(0xAA).as_ref()),
796                    Bytes::from(make_address(0xAA).as_ref()),
797                    "1".to_string(),
798                ),
799            )],
800            failed_solvers: vec![],
801        };
802
803        let options = match max_gas {
804            Some(gas) => QuoteOptions::default().with_max_gas(BigUint::from(gas)),
805            None => QuoteOptions::default(),
806        };
807
808        let worker_router =
809            WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
810        let result = worker_router.rank_quotes(&responses, &options);
811
812        if should_pass {
813            assert_eq!(result[0].status(), QuoteStatus::Success);
814        } else {
815            assert_eq!(result[0].status(), QuoteStatus::NoRouteFound);
816        }
817    }
818
819    #[tokio::test]
820    async fn test_router_captures_solver_errors() {
821        // Pool that returns an error
822        let (pool, worker) = create_mock_pool(
823            "error_pool",
824            Err(SolveError::NoRouteFound { order_id: "test-order".to_string() }),
825            0,
826        );
827
828        let worker_router =
829            WorkerPoolRouter::new(vec![pool], WorkerPoolRouterConfig::default(), default_encoder());
830        let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
831
832        let result = worker_router.quote(request).await;
833        assert!(result.is_ok());
834
835        let quote = result.unwrap();
836        assert_eq!(quote.orders().len(), 1);
837        // Should be NoRouteFound since the only solver returned an error
838        assert_eq!(quote.orders()[0].status(), QuoteStatus::NoRouteFound);
839
840        drop(worker_router);
841        worker.abort();
842    }
843
844    #[test]
845    fn test_rank_quotes_all_timeouts_returns_timeout_status() {
846        let responses = OrderResponses {
847            order_id: "test".to_string(),
848            quotes: vec![],
849            failed_solvers: vec![
850                ("pool_a".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
851                ("pool_b".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
852            ],
853        };
854
855        let worker_router =
856            WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
857        let result = worker_router.rank_quotes(&responses, &QuoteOptions::default());
858
859        assert_eq!(result.len(), 1);
860        assert_eq!(result[0].status(), QuoteStatus::Timeout);
861    }
862
863    #[test]
864    fn test_rank_quotes_mixed_failures_returns_no_route_found() {
865        let responses = OrderResponses {
866            order_id: "test".to_string(),
867            quotes: vec![],
868            failed_solvers: vec![
869                ("pool_a".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
870                ("pool_b".to_string(), SolveError::NoRouteFound { order_id: "test".to_string() }),
871            ],
872        };
873
874        let worker_router =
875            WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
876        let result = worker_router.rank_quotes(&responses, &QuoteOptions::default());
877
878        assert_eq!(result.len(), 1);
879        assert_eq!(result[0].status(), QuoteStatus::NoRouteFound);
880    }
881
882    #[test]
883    fn test_rank_quotes_no_failures_returns_no_route_found() {
884        let responses =
885            OrderResponses { order_id: "test".to_string(), quotes: vec![], failed_solvers: vec![] };
886
887        let worker_router =
888            WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
889        let result = worker_router.rank_quotes(&responses, &QuoteOptions::default());
890
891        assert_eq!(result.len(), 1);
892        assert_eq!(result[0].status(), QuoteStatus::NoRouteFound);
893    }
894
895    #[test]
896    fn test_rank_quotes_returns_sorted_candidates() {
897        let responses = OrderResponses {
898            order_id: "test".to_string(),
899            quotes: vec![
900                (
901                    "pool_a".to_string(),
902                    OrderQuote::new(
903                        "test".to_string(),
904                        QuoteStatus::Success,
905                        BigUint::from(1000u64),
906                        BigUint::from(800u64),
907                        BigUint::from(100_000u64),
908                        BigUint::from(800u64),
909                        BlockInfo::new(1, "0x123".to_string(), 1000),
910                        "test".to_string(),
911                        Bytes::from(make_address(0xAA).as_ref()),
912                        Bytes::from(make_address(0xAA).as_ref()),
913                        "1".to_string(),
914                    ),
915                ),
916                (
917                    "pool_b".to_string(),
918                    OrderQuote::new(
919                        "test".to_string(),
920                        QuoteStatus::Success,
921                        BigUint::from(1000u64),
922                        BigUint::from(950u64),
923                        BigUint::from(100_000u64),
924                        BigUint::from(950u64),
925                        BlockInfo::new(1, "0x123".to_string(), 1000),
926                        "test".to_string(),
927                        Bytes::from(make_address(0xAA).as_ref()),
928                        Bytes::from(make_address(0xAA).as_ref()),
929                        "1".to_string(),
930                    ),
931                ),
932            ],
933            failed_solvers: vec![],
934        };
935
936        let worker_router =
937            WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
938        let result = worker_router.rank_quotes(&responses, &QuoteOptions::default());
939
940        assert_eq!(result.len(), 2);
941        assert_eq!(*result[0].amount_out_net_gas(), BigUint::from(950u64));
942        assert_eq!(*result[1].amount_out_net_gas(), BigUint::from(800u64));
943    }
944}