Skip to main content

fynd_core/worker_pool_router/
mod.rs

1//! Orchestrates multiple solver pools to find the best quote per request.
2//!
3//! The WorkerPoolRouter sits between the API layer and multiple solver pools.
4//! It fans out each order to all configured solvers, manages timeouts,
5//! selects the best quote based on `amount_out_net_gas`, and optionally
6//! encodes the winning solution into an on-chain transaction.
7
8//! # Responsibilities
9//!
10//! 1. **Fan-out**: Distribute each order to solver pools. Its distribution algorithm can be
11//!    customized, but initially it's set to relay to all solvers.
12//! 2. **Timeout**: Cancel if solver response takes too long
13//! 3. **Collection**: Wait for N responses OR timeout per order
14//! 4. **Selection**: Choose best quote (max `amount_out_net_gas`)
15//! 5. **Encoding**: If [`EncodingOptions`](crate::EncodingOptions) are provided in the request,
16//!    encode winning solutions into executable on-chain transactions via the
17//!    [`encoding::encoder::Encoder`](crate::encoding::encoder::Encoder)
18
19pub mod config;
20
21use std::{
22    collections::HashSet,
23    time::{Duration, Instant},
24};
25
26use config::WorkerPoolRouterConfig;
27use futures::stream::{FuturesUnordered, StreamExt};
28use metrics::{counter, histogram};
29use num_bigint::BigUint;
30use tracing::{debug, warn};
31use tycho_simulation::tycho_common::Bytes;
32
33use crate::{
34    encoding::encoder::Encoder, price_guard::guard::PriceGuard,
35    worker_pool::task_queue::TaskQueueHandle, BlockInfo, Order, OrderQuote, Quote, QuoteOptions,
36    QuoteRequest, QuoteStatus, SolveError,
37};
38
39/// Handle to a solver pool for dispatching orders.
40#[derive(Clone)]
41pub struct SolverPoolHandle {
42    /// Human-readable name for this pool (used in logging & metrics).
43    name: String,
44    /// Queue handle for this pool.
45    queue: TaskQueueHandle,
46}
47
48impl SolverPoolHandle {
49    /// Creates a new solver pool handle.
50    pub fn new(name: impl Into<String>, queue: TaskQueueHandle) -> Self {
51        Self { name: name.into(), queue }
52    }
53
54    /// Returns the pool name.
55    pub fn name(&self) -> &str {
56        &self.name
57    }
58
59    /// Returns the task queue handle.
60    pub fn queue(&self) -> &TaskQueueHandle {
61        &self.queue
62    }
63}
64
65/// Collected responses for a single order from multiple solvers.
66#[derive(Debug)]
67pub(crate) struct OrderResponses {
68    /// ID of the order these responses correspond to.
69    order_id: String,
70    /// Quotes received from each solver pool (pool_name, quote).
71    quotes: Vec<(String, OrderQuote)>,
72    /// Solver pools that failed with their respective errors (pool_name, error).
73    /// This captures all error types: timeouts, no routes, algorithm errors, etc.
74    failed_solvers: Vec<(String, SolveError)>,
75}
76
77/// Orchestrates multiple solver pools to find the best quote.
78pub struct WorkerPoolRouter {
79    /// All registered solver pools.
80    solver_pools: Vec<SolverPoolHandle>,
81    /// Configuration for the worker router.
82    config: WorkerPoolRouterConfig,
83    /// Encoder for encoding solutions into on-chain transactions.
84    encoder: Encoder,
85    /// Validates solution outputs against external price sources.
86    /// Present when the server has price guard enabled; `None` when disabled.
87    price_guard: Option<PriceGuard>,
88}
89
90impl WorkerPoolRouter {
91    /// Creates a new WorkerPoolRouter with the given solver pools, config, and encoder.
92    pub fn new(
93        solver_pools: Vec<SolverPoolHandle>,
94        config: WorkerPoolRouterConfig,
95        encoder: Encoder,
96    ) -> Self {
97        Self { solver_pools, config, encoder, price_guard: None }
98    }
99
100    /// Makes price guard validation available for this router.
101    ///
102    /// Providers are started and caches stay warm. Validation only runs for
103    /// requests where the client sets `enabled: true` in `PriceGuardConfig`.
104    pub fn with_price_guard(mut self, price_guard: PriceGuard) -> Self {
105        self.price_guard = Some(price_guard);
106        self
107    }
108
109    /// Returns the number of registered solver pools.
110    pub fn num_pools(&self) -> usize {
111        self.solver_pools.len()
112    }
113
114    /// Returns a quote by fanning out to all solver pools.
115    ///
116    /// For each order in the request:
117    /// 1. Sends the order to all solver pools in parallel
118    /// 2. Waits for responses with timeout
119    /// 3. Selects the best quote based on `amount_out_net_gas`
120    /// 4. If `encoding_options` are set on the request, encodes winning solutions into on-chain
121    ///    transactions
122    pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
123        let start = Instant::now();
124        let deadline = start + self.effective_timeout(request.options());
125        let min_responses = request
126            .options()
127            .min_responses()
128            .unwrap_or(self.config.min_responses());
129
130        if self.solver_pools.is_empty() {
131            return Err(SolveError::Internal("no solver pools configured".to_string()));
132        }
133
134        // Process each order independently in parallel
135        let order_futures: Vec<_> = request
136            .orders()
137            .iter()
138            .map(|order| self.solve_order(order.clone(), deadline, min_responses))
139            .collect();
140
141        let order_responses = futures::future::join_all(order_futures).await;
142
143        // Rank quotes for each order (sorted by amount_out_net_gas descending)
144        let ranked_quotes: Vec<Vec<OrderQuote>> = order_responses
145            .into_iter()
146            .map(|responses| self.rank_quotes(&responses, request.options()))
147            .collect();
148
149        // Validate against external prices when the client explicitly enables it.
150        let price_guard_config = request
151            .options()
152            .encoding_options()
153            .map(|e| e.price_guard())
154            .filter(|c| c.enabled());
155
156        let mut order_quotes: Vec<OrderQuote> = match (&self.price_guard, price_guard_config) {
157            (Some(guard), Some(config)) => guard
158                .validate(ranked_quotes, config)
159                .map_err(|e| {
160                    warn!(error = %e, "price guard validation error");
161                    SolveError::Internal(e.to_string())
162                })?,
163            (None, Some(_)) => {
164                return Err(SolveError::Internal(
165                    "price guard config provided but price guard is not enabled on this server"
166                        .to_string(),
167                ));
168            }
169            _ => ranked_quotes
170                .into_iter()
171                .filter_map(|candidates| candidates.into_iter().next())
172                .collect(),
173        };
174
175        // Encode solutions if encoding_options is set
176        if let Some(encoding_options) = request.options().encoding_options() {
177            order_quotes = self
178                .encoder
179                .encode(order_quotes, encoding_options.clone())
180                .await?;
181        }
182
183        // Calculate totals
184        let total_gas_estimate = order_quotes
185            .iter()
186            .map(|o| o.gas_estimate())
187            .fold(BigUint::ZERO, |acc, g| acc + g);
188
189        let solve_time_ms = start.elapsed().as_millis() as u64;
190
191        Ok(Quote::new(order_quotes, total_gas_estimate, solve_time_ms))
192    }
193
194    /// Solves a single order by fanning out to all solver pools.
195    async fn solve_order(
196        &self,
197        order: Order,
198        deadline: Instant,
199        min_responses: usize,
200    ) -> OrderResponses {
201        let start_time = Instant::now();
202        let order_id = order.id().to_string();
203
204        // Fan-out: send order to all solver pools
205        // perf: In the future, we can add new distribution algorithms, like sending short-timeout
206        // only to fast workers.
207        let mut pending: FuturesUnordered<_> = self
208            .solver_pools
209            .iter()
210            .map(|pool| {
211                let order_clone = order.clone();
212                let pool_name = pool.name().to_string();
213                let queue = pool.queue().clone();
214
215                async move {
216                    let result = queue.enqueue(order_clone).await;
217                    (pool_name, result)
218                }
219            })
220            .collect();
221
222        let mut quotes = Vec::new();
223        let mut failed_solvers: Vec<(String, SolveError)> = Vec::new();
224        let mut remaining_pools: HashSet<String> = self
225            .solver_pools
226            .iter()
227            .map(|p| p.name().to_string())
228            .collect();
229
230        // Collect responses with timeout
231        loop {
232            let deadline_instant = tokio::time::Instant::from_std(deadline);
233
234            tokio::select! {
235                // Always checks timeout first, ensuring we respect the deadline
236                biased;
237
238                // Timeout reached
239                _ = tokio::time::sleep_until(deadline_instant) => {
240                    // Mark all remaining pools as timed out
241                    let elapsed_ms = deadline.saturating_duration_since(Instant::now())
242                        .as_millis() as u64;
243                    for pool_name in remaining_pools.drain() {
244                        failed_solvers.push((
245                            pool_name,
246                            SolveError::Timeout { elapsed_ms },
247                        ));
248                    }
249                    break;
250                }
251
252                // Response received
253                result = pending.next() => {
254                    match result {
255                        Some((pool_name, Ok(single_quote))) => {
256                            // Remove from remaining
257                            remaining_pools.remove(&pool_name);
258
259                            // Extract the OrderQuote from SingleOrderQuote
260                            quotes.push((pool_name.clone(), single_quote.order().clone()));
261
262                            // Early return if min_responses reached
263                            if min_responses > 0 && quotes.len() >= min_responses {
264                                debug!(
265                                    order_id = %order_id,
266                                    responses = quotes.len(),
267                                    min_responses,
268                                    "early return: min_responses reached"
269                                );
270                                counter!("worker_router_early_returns_total").increment(1);
271                                break;
272                            }
273                        }
274                        Some((pool_name, Err(e))) => {
275                            remaining_pools.remove(&pool_name);
276                            warn!(
277                                pool = %pool_name,
278                                order_id = %order_id,
279                                error = %e,
280                                "solver pool failed"
281                            );
282                            failed_solvers.push((pool_name, e));
283                        }
284                        None => {
285                            // All futures completed
286                            break;
287                        }
288                    }
289                }
290            }
291        }
292
293        // Record metrics
294        let duration = start_time.elapsed().as_secs_f64();
295        histogram!("worker_router_solve_duration_seconds").record(duration);
296        histogram!("worker_router_solver_responses").record(quotes.len() as f64);
297
298        // Record failures by pool and error type
299        for (pool_name, error) in &failed_solvers {
300            let error_type = match error {
301                SolveError::Timeout { .. } => "timeout",
302                SolveError::NoRouteFound { .. } => "no_route",
303                SolveError::QueueFull => "queue_full",
304                SolveError::Internal(_) => "internal",
305                SolveError::PriceCheckFailed { .. } => "price_check_failed",
306                _ => "other",
307            };
308            counter!("worker_router_solver_failures_total", "pool" => pool_name.clone(), "error_type" => error_type).increment(1);
309        }
310
311        if !failed_solvers.is_empty() {
312            let timeout_count = failed_solvers
313                .iter()
314                .filter(|(_, e)| matches!(e, SolveError::Timeout { .. }))
315                .count();
316            let other_count = failed_solvers.len() - timeout_count;
317            warn!(
318                order_id = %order_id,
319                timeout_count,
320                other_failures = other_count,
321                "some solver pools failed"
322            );
323        }
324
325        OrderResponses { order_id, quotes, failed_solvers }
326    }
327
328    /// Returns all valid quotes for an order, ranked by `amount_out_net_gas` descending.
329    ///
330    /// If no valid quotes exist, returns a single-element vec with a placeholder
331    /// (`NoRouteFound` or `Timeout`) so that downstream always has at least one
332    /// candidate per order.
333    fn rank_quotes(&self, responses: &OrderResponses, options: &QuoteOptions) -> Vec<OrderQuote> {
334        let mut valid_quotes: Vec<_> = responses
335            .quotes
336            .iter()
337            .filter(|(_, q)| q.status() == QuoteStatus::Success)
338            .filter(|(_, q)| {
339                options
340                    .max_gas()
341                    .map(|max| q.gas_estimate() <= max)
342                    .unwrap_or(true)
343            })
344            .collect();
345
346        // Sort descending by amount_out_net_gas
347        valid_quotes.sort_by(|(_, a), (_, b)| {
348            b.amount_out_net_gas()
349                .cmp(a.amount_out_net_gas())
350        });
351
352        if !valid_quotes.is_empty() {
353            counter!("worker_router_orders_total", "status" => "success").increment(1);
354            let (pool_name, best) = valid_quotes[0];
355            counter!("worker_router_best_quote_pool", "pool" => pool_name.clone()).increment(1);
356            debug!(
357                order_id = %best.order_id(),
358                number_of_candidates = valid_quotes.len(),
359                "ranked quotes"
360            );
361            return valid_quotes
362                .into_iter()
363                .map(|(_, q)| q.clone())
364                .collect();
365        }
366
367        // No valid quote found - return a NoRouteFound response
368        // Try to get any response to extract block info, or create a placeholder
369        let fallback = if let Some((_, any_q)) = responses.quotes.first() {
370            counter!("worker_router_orders_total", "status" => "no_route").increment(1);
371            OrderQuote::new(
372                responses.order_id.clone(),
373                QuoteStatus::NoRouteFound,
374                any_q.amount_in().clone(),
375                BigUint::ZERO,
376                BigUint::ZERO,
377                BigUint::ZERO,
378                any_q.block().clone(),
379                String::new(),
380                any_q.sender().clone(),
381                any_q.receiver().clone(),
382            )
383        } else {
384            // No responses at all - determine status from failure types
385            let status = if responses.failed_solvers.is_empty() {
386                QuoteStatus::NoRouteFound
387            } else {
388                // If all failures are timeouts, report as Timeout
389                // Otherwise report as NoRouteFound (more general failure)
390                let all_timeouts = responses
391                    .failed_solvers
392                    .iter()
393                    .all(|(_, e)| matches!(e, SolveError::Timeout { .. }));
394                let all_not_ready = responses
395                    .failed_solvers
396                    .iter()
397                    .all(|(_, e)| matches!(e, SolveError::NotReady(_)));
398                if all_timeouts {
399                    QuoteStatus::Timeout
400                } else if all_not_ready {
401                    QuoteStatus::NotReady
402                } else {
403                    QuoteStatus::NoRouteFound
404                }
405            };
406
407            // Record status metric
408            let status_label = match status {
409                QuoteStatus::Timeout => "timeout",
410                QuoteStatus::NotReady => "not_ready",
411                _ => "no_route",
412            };
413            counter!("worker_router_orders_total", "status" => status_label).increment(1);
414
415            OrderQuote::new(
416                responses.order_id.clone(),
417                status,
418                BigUint::ZERO,
419                BigUint::ZERO,
420                BigUint::ZERO,
421                BigUint::ZERO,
422                BlockInfo::new(0, String::new(), 0),
423                String::new(),
424                Bytes::default(),
425                Bytes::default(),
426            )
427        };
428        vec![fallback]
429    }
430
431    /// Returns the effective timeout for a request.
432    fn effective_timeout(&self, options: &QuoteOptions) -> Duration {
433        options
434            .timeout_ms()
435            .map(Duration::from_millis)
436            .unwrap_or(self.config.default_timeout())
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    use rstest::rstest;
443    use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
444    use tycho_simulation::{
445        tycho_common::models::Chain,
446        tycho_core::{
447            models::{token::Token, Address, Chain as SimChain},
448            Bytes,
449        },
450    };
451
452    use super::*;
453    use crate::{
454        algorithm::test_utils::{component, MockProtocolSim},
455        types::internal::SolveTask,
456        EncodingOptions, OrderSide, Route, SingleOrderQuote, Swap,
457    };
458
459    fn default_encoder() -> Encoder {
460        let registry = SwapEncoderRegistry::new(Chain::Ethereum)
461            .add_default_encoders(None)
462            .expect("default encoders should always succeed");
463        Encoder::new(Chain::Ethereum, registry).expect("encoder creation should succeed")
464    }
465
466    fn make_address(byte: u8) -> Address {
467        Address::from([byte; 20])
468    }
469
470    fn make_order() -> Order {
471        Order::new(
472            make_address(0x01),
473            make_address(0x02),
474            BigUint::from(1000u64),
475            OrderSide::Sell,
476            make_address(0xAA),
477        )
478        .with_id("test-order".to_string())
479    }
480
481    fn make_single_quote(amount_out_net_gas: u64) -> SingleOrderQuote {
482        let make_token = |addr: Address| Token {
483            address: addr,
484            symbol: "T".to_string(),
485            decimals: 18,
486            tax: Default::default(),
487            gas: vec![],
488            chain: SimChain::Ethereum,
489            quality: 100,
490        };
491        let tin = make_address(0x01);
492        let tout = make_address(0x02);
493        let swap = Swap::new(
494            "pool-1".to_string(),
495            "uniswap_v2".to_string(),
496            tin.clone(),
497            tout.clone(),
498            BigUint::from(1000u64),
499            BigUint::from(990u64),
500            BigUint::from(50_000u64),
501            component(
502                "0x0000000000000000000000000000000000000001",
503                &[make_token(tin), make_token(tout)],
504            ),
505            Box::new(MockProtocolSim::default()),
506        );
507        let quote = OrderQuote::new(
508            "test-order".to_string(),
509            QuoteStatus::Success,
510            BigUint::from(1000u64),
511            BigUint::from(990u64),
512            BigUint::from(100_000u64),
513            BigUint::from(amount_out_net_gas),
514            BlockInfo::new(1, "0x123".to_string(), 1000),
515            "test".to_string(),
516            Bytes::from(make_address(0xAA).as_ref()),
517            Bytes::from(make_address(0xAA).as_ref()),
518        )
519        .with_route(Route::new(vec![swap]));
520        SingleOrderQuote::new(quote, 5)
521    }
522
523    // Helper to create a mock solver pool that responds with a given solution
524    fn create_mock_pool(
525        name: &str,
526        response: Result<SingleOrderQuote, SolveError>,
527        delay_ms: u64,
528    ) -> (SolverPoolHandle, tokio::task::JoinHandle<()>) {
529        let (tx, rx) = async_channel::bounded::<SolveTask>(10);
530        let handle = TaskQueueHandle::from_sender(tx);
531
532        let worker = tokio::spawn(async move {
533            while let Ok(task) = rx.recv().await {
534                if delay_ms > 0 {
535                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
536                }
537                task.respond(response.clone());
538            }
539        });
540
541        (SolverPoolHandle::new(name, handle), worker)
542    }
543
544    #[test]
545    fn test_config_default() {
546        let config = WorkerPoolRouterConfig::default();
547        assert_eq!(config.default_timeout(), Duration::from_secs(1));
548        assert_eq!(config.min_responses(), 1);
549    }
550
551    #[test]
552    fn test_config_builder() {
553        let config = WorkerPoolRouterConfig::default()
554            .with_timeout(Duration::from_millis(500))
555            .with_min_responses(2);
556        assert_eq!(config.default_timeout(), Duration::from_millis(500));
557        assert_eq!(config.min_responses(), 2);
558    }
559
560    #[tokio::test]
561    async fn test_router_no_pools() {
562        let worker_router =
563            WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
564        let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
565
566        let result = worker_router.quote(request).await;
567        assert!(matches!(result, Err(SolveError::Internal(_))));
568    }
569
570    #[tokio::test]
571    async fn test_router_single_pool_success() {
572        let (pool, worker) = create_mock_pool("pool_a", Ok(make_single_quote(900)), 0);
573
574        let worker_router =
575            WorkerPoolRouter::new(vec![pool], WorkerPoolRouterConfig::default(), default_encoder());
576        let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
577        let request = QuoteRequest::new(vec![make_order()], options);
578
579        let result = worker_router.quote(request).await;
580        assert!(result.is_ok());
581
582        let quote = result.unwrap();
583        assert_eq!(quote.orders().len(), 1);
584        assert_eq!(quote.orders()[0].status(), QuoteStatus::Success);
585        assert_eq!(*quote.orders()[0].amount_out_net_gas(), BigUint::from(900u64));
586        assert!(!quote.orders()[0]
587            .transaction()
588            .unwrap()
589            .data()
590            .is_empty());
591
592        drop(worker_router);
593        worker.abort();
594    }
595
596    #[tokio::test]
597    async fn test_router_selects_best_of_two() {
598        // Pool A: worse quote (net gas = 800)
599        let (pool_a, worker_a) = create_mock_pool("pool_a", Ok(make_single_quote(800)), 0);
600        // Pool B: better quote (net gas = 950)
601        let (pool_b, worker_b) = create_mock_pool("pool_b", Ok(make_single_quote(950)), 0);
602
603        // Wait for both responses to test best selection logic
604        let config = WorkerPoolRouterConfig::default().with_min_responses(2);
605        let worker_router = WorkerPoolRouter::new(vec![pool_a, pool_b], config, default_encoder());
606        let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
607        let request = QuoteRequest::new(vec![make_order()], options);
608
609        let result = worker_router.quote(request).await;
610        assert!(result.is_ok());
611
612        let quote = result.unwrap();
613        assert_eq!(quote.orders().len(), 1);
614        // Should select pool_b's quote (higher amount_out_net_gas)
615        assert_eq!(*quote.orders()[0].amount_out_net_gas(), BigUint::from(950u64));
616        assert!(!quote.orders()[0]
617            .transaction()
618            .unwrap()
619            .data()
620            .is_empty());
621
622        drop(worker_router);
623        worker_a.abort();
624        worker_b.abort();
625    }
626
627    #[tokio::test]
628    async fn test_router_timeout() {
629        // Pool that takes too long
630        let (pool, worker) = create_mock_pool("slow_pool", Ok(make_single_quote(900)), 500);
631
632        let config = WorkerPoolRouterConfig::default().with_timeout(Duration::from_millis(50));
633        let worker_router = WorkerPoolRouter::new(vec![pool], config, default_encoder());
634        let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
635
636        let result = worker_router.quote(request).await;
637        assert!(result.is_ok());
638
639        let quote = result.unwrap();
640        // Should timeout and return NoRouteFound or Timeout status
641        assert_eq!(quote.orders().len(), 1);
642        assert!(matches!(
643            quote.orders()[0].status(),
644            QuoteStatus::Timeout | QuoteStatus::NoRouteFound
645        ));
646
647        drop(worker_router);
648        worker.abort();
649    }
650
651    #[tokio::test]
652    async fn test_router_early_return_on_min_responses() {
653        // Pool A: fast
654        let (pool_a, worker_a) = create_mock_pool("fast_pool", Ok(make_single_quote(800)), 0);
655        // Pool B: slow (but we won't wait for it)
656        let (pool_b, worker_b) = create_mock_pool("slow_pool", Ok(make_single_quote(950)), 500);
657
658        let config = WorkerPoolRouterConfig::default()
659            .with_timeout(Duration::from_millis(1000))
660            .with_min_responses(1);
661        let worker_router = WorkerPoolRouter::new(vec![pool_a, pool_b], config, default_encoder());
662
663        let start = Instant::now();
664        let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
665        let request = QuoteRequest::new(vec![make_order()], options);
666
667        let result = worker_router.quote(request).await;
668        let elapsed = start.elapsed();
669
670        assert!(result.is_ok());
671        // Should return quickly (not waiting for pool_b)
672        assert!(elapsed < Duration::from_millis(200));
673
674        // Should have pool_a's quote
675        let quote = result.unwrap();
676        assert_eq!(quote.orders().len(), 1);
677        assert_eq!(quote.orders()[0].status(), QuoteStatus::Success);
678        // Should have encoding
679        assert!(!quote.orders()[0]
680            .transaction()
681            .unwrap()
682            .data()
683            .is_empty());
684
685        drop(worker_router);
686        worker_a.abort();
687        worker_b.abort();
688    }
689
690    #[rstest]
691    #[case::under_limit(100, Some(200), true)]
692    #[case::at_limit(200, Some(200), true)]
693    #[case::over_limit(300, Some(200), false)]
694    #[case::no_limit(500, None, true)]
695    fn test_max_gas_constraint(
696        #[case] gas_estimate: u64,
697        #[case] max_gas: Option<u64>,
698        #[case] should_pass: bool,
699    ) {
700        let responses = OrderResponses {
701            order_id: "test".to_string(),
702            quotes: vec![(
703                "pool".to_string(),
704                OrderQuote::new(
705                    "test".to_string(),
706                    QuoteStatus::Success,
707                    BigUint::from(1000u64),
708                    BigUint::from(990u64),
709                    BigUint::from(gas_estimate),
710                    BigUint::from(900u64),
711                    BlockInfo::new(1, "0x123".to_string(), 1000),
712                    "test".to_string(),
713                    Bytes::from(make_address(0xAA).as_ref()),
714                    Bytes::from(make_address(0xAA).as_ref()),
715                ),
716            )],
717            failed_solvers: vec![],
718        };
719
720        let options = match max_gas {
721            Some(gas) => QuoteOptions::default().with_max_gas(BigUint::from(gas)),
722            None => QuoteOptions::default(),
723        };
724
725        let worker_router =
726            WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
727        let result = worker_router.rank_quotes(&responses, &options);
728
729        if should_pass {
730            assert_eq!(result[0].status(), QuoteStatus::Success);
731        } else {
732            assert_eq!(result[0].status(), QuoteStatus::NoRouteFound);
733        }
734    }
735
736    #[tokio::test]
737    async fn test_router_captures_solver_errors() {
738        // Pool that returns an error
739        let (pool, worker) = create_mock_pool(
740            "error_pool",
741            Err(SolveError::NoRouteFound { order_id: "test-order".to_string() }),
742            0,
743        );
744
745        let worker_router =
746            WorkerPoolRouter::new(vec![pool], WorkerPoolRouterConfig::default(), default_encoder());
747        let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
748
749        let result = worker_router.quote(request).await;
750        assert!(result.is_ok());
751
752        let quote = result.unwrap();
753        assert_eq!(quote.orders().len(), 1);
754        // Should be NoRouteFound since the only solver returned an error
755        assert_eq!(quote.orders()[0].status(), QuoteStatus::NoRouteFound);
756
757        drop(worker_router);
758        worker.abort();
759    }
760
761    #[test]
762    fn test_rank_quotes_all_timeouts_returns_timeout_status() {
763        let responses = OrderResponses {
764            order_id: "test".to_string(),
765            quotes: vec![],
766            failed_solvers: vec![
767                ("pool_a".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
768                ("pool_b".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
769            ],
770        };
771
772        let worker_router =
773            WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
774        let result = worker_router.rank_quotes(&responses, &QuoteOptions::default());
775
776        assert_eq!(result.len(), 1);
777        assert_eq!(result[0].status(), QuoteStatus::Timeout);
778    }
779
780    #[test]
781    fn test_rank_quotes_mixed_failures_returns_no_route_found() {
782        let responses = OrderResponses {
783            order_id: "test".to_string(),
784            quotes: vec![],
785            failed_solvers: vec![
786                ("pool_a".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
787                ("pool_b".to_string(), SolveError::NoRouteFound { order_id: "test".to_string() }),
788            ],
789        };
790
791        let worker_router =
792            WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
793        let result = worker_router.rank_quotes(&responses, &QuoteOptions::default());
794
795        assert_eq!(result.len(), 1);
796        assert_eq!(result[0].status(), QuoteStatus::NoRouteFound);
797    }
798
799    #[test]
800    fn test_rank_quotes_no_failures_returns_no_route_found() {
801        let responses =
802            OrderResponses { order_id: "test".to_string(), quotes: vec![], failed_solvers: vec![] };
803
804        let worker_router =
805            WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
806        let result = worker_router.rank_quotes(&responses, &QuoteOptions::default());
807
808        assert_eq!(result.len(), 1);
809        assert_eq!(result[0].status(), QuoteStatus::NoRouteFound);
810    }
811
812    #[test]
813    fn test_rank_quotes_returns_sorted_candidates() {
814        let responses = OrderResponses {
815            order_id: "test".to_string(),
816            quotes: vec![
817                (
818                    "pool_a".to_string(),
819                    OrderQuote::new(
820                        "test".to_string(),
821                        QuoteStatus::Success,
822                        BigUint::from(1000u64),
823                        BigUint::from(800u64),
824                        BigUint::from(100_000u64),
825                        BigUint::from(800u64),
826                        BlockInfo::new(1, "0x123".to_string(), 1000),
827                        "test".to_string(),
828                        Bytes::from(make_address(0xAA).as_ref()),
829                        Bytes::from(make_address(0xAA).as_ref()),
830                    ),
831                ),
832                (
833                    "pool_b".to_string(),
834                    OrderQuote::new(
835                        "test".to_string(),
836                        QuoteStatus::Success,
837                        BigUint::from(1000u64),
838                        BigUint::from(950u64),
839                        BigUint::from(100_000u64),
840                        BigUint::from(950u64),
841                        BlockInfo::new(1, "0x123".to_string(), 1000),
842                        "test".to_string(),
843                        Bytes::from(make_address(0xAA).as_ref()),
844                        Bytes::from(make_address(0xAA).as_ref()),
845                    ),
846                ),
847            ],
848            failed_solvers: vec![],
849        };
850
851        let worker_router =
852            WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
853        let result = worker_router.rank_quotes(&responses, &QuoteOptions::default());
854
855        assert_eq!(result.len(), 2);
856        assert_eq!(*result[0].amount_out_net_gas(), BigUint::from(950u64));
857        assert_eq!(*result[1].amount_out_net_gas(), BigUint::from(800u64));
858    }
859}