1pub mod config;
20
21use std::{
22 collections::HashSet,
23 time::{Duration, Instant},
24};
25
26use config::WorkerPoolRouterConfig;
27use futures::stream::{FuturesUnordered, StreamExt};
28use metrics::{counter, histogram};
29use num_bigint::BigUint;
30use tracing::{debug, warn};
31use tycho_simulation::tycho_common::Bytes;
32
33use crate::{
34 encoding::encoder::Encoder, worker_pool::task_queue::TaskQueueHandle, BlockInfo, Order,
35 OrderQuote, Quote, QuoteOptions, QuoteRequest, QuoteStatus, SolveError,
36};
37
38#[derive(Clone)]
40pub struct SolverPoolHandle {
41 name: String,
43 queue: TaskQueueHandle,
45}
46
47impl SolverPoolHandle {
48 pub fn new(name: impl Into<String>, queue: TaskQueueHandle) -> Self {
50 Self { name: name.into(), queue }
51 }
52
53 pub fn name(&self) -> &str {
55 &self.name
56 }
57
58 pub fn queue(&self) -> &TaskQueueHandle {
60 &self.queue
61 }
62}
63
64#[derive(Debug)]
66pub(crate) struct OrderResponses {
67 order_id: String,
69 quotes: Vec<(String, OrderQuote)>,
71 failed_solvers: Vec<(String, SolveError)>,
74}
75
76pub struct WorkerPoolRouter {
78 solver_pools: Vec<SolverPoolHandle>,
80 config: WorkerPoolRouterConfig,
82 encoder: Encoder,
84}
85
86impl WorkerPoolRouter {
87 pub fn new(
89 solver_pools: Vec<SolverPoolHandle>,
90 config: WorkerPoolRouterConfig,
91 encoder: Encoder,
92 ) -> Self {
93 Self { solver_pools, config, encoder }
94 }
95
96 pub fn num_pools(&self) -> usize {
98 self.solver_pools.len()
99 }
100
101 pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
110 let start = Instant::now();
111 let deadline = start + self.effective_timeout(request.options());
112 let min_responses = request
113 .options()
114 .min_responses()
115 .unwrap_or(self.config.min_responses());
116
117 if self.solver_pools.is_empty() {
118 return Err(SolveError::Internal("no solver pools configured".to_string()));
119 }
120
121 let order_futures: Vec<_> = request
123 .orders()
124 .iter()
125 .map(|order| self.solve_order(order.clone(), deadline, min_responses))
126 .collect();
127
128 let order_responses = futures::future::join_all(order_futures).await;
129
130 let mut order_quotes: Vec<OrderQuote> = order_responses
132 .into_iter()
133 .map(|responses| self.select_best(&responses, request.options()))
134 .collect();
135
136 if let Some(encoding_options) = request.options().encoding_options() {
138 order_quotes = self
139 .encoder
140 .encode(order_quotes, encoding_options.clone())
141 .await?;
142 }
143
144 let total_gas_estimate = order_quotes
146 .iter()
147 .map(|o| o.gas_estimate())
148 .fold(BigUint::ZERO, |acc, g| acc + g);
149
150 let solve_time_ms = start.elapsed().as_millis() as u64;
151
152 Ok(Quote::new(order_quotes, total_gas_estimate, solve_time_ms))
153 }
154
155 async fn solve_order(
157 &self,
158 order: Order,
159 deadline: Instant,
160 min_responses: usize,
161 ) -> OrderResponses {
162 let start_time = Instant::now();
163 let order_id = order.id().to_string();
164
165 let mut pending: FuturesUnordered<_> = self
169 .solver_pools
170 .iter()
171 .map(|pool| {
172 let order_clone = order.clone();
173 let pool_name = pool.name().to_string();
174 let queue = pool.queue().clone();
175
176 async move {
177 let result = queue.enqueue(order_clone).await;
178 (pool_name, result)
179 }
180 })
181 .collect();
182
183 let mut quotes = Vec::new();
184 let mut failed_solvers: Vec<(String, SolveError)> = Vec::new();
185 let mut remaining_pools: HashSet<String> = self
186 .solver_pools
187 .iter()
188 .map(|p| p.name().to_string())
189 .collect();
190
191 loop {
193 let deadline_instant = tokio::time::Instant::from_std(deadline);
194
195 tokio::select! {
196 biased;
198
199 _ = tokio::time::sleep_until(deadline_instant) => {
201 let elapsed_ms = deadline.saturating_duration_since(Instant::now())
203 .as_millis() as u64;
204 for pool_name in remaining_pools.drain() {
205 failed_solvers.push((
206 pool_name,
207 SolveError::Timeout { elapsed_ms },
208 ));
209 }
210 break;
211 }
212
213 result = pending.next() => {
215 match result {
216 Some((pool_name, Ok(single_quote))) => {
217 remaining_pools.remove(&pool_name);
219
220 quotes.push((pool_name.clone(), single_quote.order().clone()));
222
223 if min_responses > 0 && quotes.len() >= min_responses {
225 debug!(
226 order_id = %order_id,
227 responses = quotes.len(),
228 min_responses,
229 "early return: min_responses reached"
230 );
231 counter!("worker_router_early_returns_total").increment(1);
232 break;
233 }
234 }
235 Some((pool_name, Err(e))) => {
236 remaining_pools.remove(&pool_name);
237 warn!(
238 pool = %pool_name,
239 order_id = %order_id,
240 error = %e,
241 "solver pool failed"
242 );
243 failed_solvers.push((pool_name, e));
244 }
245 None => {
246 break;
248 }
249 }
250 }
251 }
252 }
253
254 let duration = start_time.elapsed().as_secs_f64();
256 histogram!("worker_router_solve_duration_seconds").record(duration);
257 histogram!("worker_router_solver_responses").record(quotes.len() as f64);
258
259 for (pool_name, error) in &failed_solvers {
261 let error_type = match error {
262 SolveError::Timeout { .. } => "timeout",
263 SolveError::NoRouteFound { .. } => "no_route",
264 SolveError::QueueFull => "queue_full",
265 SolveError::Internal(_) => "internal",
266 _ => "other",
267 };
268 counter!("worker_router_solver_failures_total", "pool" => pool_name.clone(), "error_type" => error_type).increment(1);
269 }
270
271 if !failed_solvers.is_empty() {
272 let timeout_count = failed_solvers
273 .iter()
274 .filter(|(_, e)| matches!(e, SolveError::Timeout { .. }))
275 .count();
276 let other_count = failed_solvers.len() - timeout_count;
277 warn!(
278 order_id = %order_id,
279 timeout_count,
280 other_failures = other_count,
281 "some solver pools failed"
282 );
283 }
284
285 OrderResponses { order_id, quotes, failed_solvers }
286 }
287
288 fn select_best(&self, responses: &OrderResponses, options: &QuoteOptions) -> OrderQuote {
294 let valid_quotes: Vec<_> = responses
295 .quotes
296 .iter()
297 .filter(|(_, q)| q.status() == QuoteStatus::Success)
299 .filter(|(_, q)| {
301 options
302 .max_gas()
303 .map(|max| q.gas_estimate() <= max)
304 .unwrap_or(true)
305 })
306 .collect();
307
308 if let Some((pool_name, best)) = valid_quotes
310 .into_iter()
311 .max_by_key(|(_, q)| q.amount_out_net_gas())
312 {
313 counter!("worker_router_orders_total", "status" => "success").increment(1);
315 counter!("worker_router_best_quote_pool", "pool" => pool_name.clone()).increment(1);
316
317 debug!(
318 order_id = %best.order_id(),
319 pool = %pool_name,
320 amount_out_net_gas = %best.amount_out_net_gas(),
321 "selected best quote"
322 );
323 return best.clone();
324 }
325
326 if let Some((_, any_q)) = responses.quotes.first() {
329 counter!("worker_router_orders_total", "status" => "no_route").increment(1);
330 OrderQuote::new(
331 responses.order_id.clone(),
332 QuoteStatus::NoRouteFound,
333 any_q.amount_in().clone(),
334 BigUint::ZERO,
335 BigUint::ZERO,
336 BigUint::ZERO,
337 any_q.block().clone(),
338 String::new(),
339 any_q.sender().clone(),
340 any_q.receiver().clone(),
341 )
342 } else {
343 let status = if responses.failed_solvers.is_empty() {
345 QuoteStatus::NoRouteFound
346 } else {
347 let all_timeouts = responses
350 .failed_solvers
351 .iter()
352 .all(|(_, e)| matches!(e, SolveError::Timeout { .. }));
353 let all_not_ready = responses
354 .failed_solvers
355 .iter()
356 .all(|(_, e)| matches!(e, SolveError::NotReady(_)));
357 if all_timeouts {
358 QuoteStatus::Timeout
359 } else if all_not_ready {
360 QuoteStatus::NotReady
361 } else {
362 QuoteStatus::NoRouteFound
363 }
364 };
365
366 let status_label = match status {
368 QuoteStatus::Timeout => "timeout",
369 QuoteStatus::NotReady => "not_ready",
370 _ => "no_route",
371 };
372 counter!("worker_router_orders_total", "status" => status_label).increment(1);
373
374 OrderQuote::new(
375 responses.order_id.clone(),
376 status,
377 BigUint::ZERO,
378 BigUint::ZERO,
379 BigUint::ZERO,
380 BigUint::ZERO,
381 BlockInfo::new(0, String::new(), 0),
382 String::new(),
383 Bytes::default(),
384 Bytes::default(),
385 )
386 }
387 }
388
389 fn effective_timeout(&self, options: &QuoteOptions) -> Duration {
391 options
392 .timeout_ms()
393 .map(Duration::from_millis)
394 .unwrap_or(self.config.default_timeout())
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use rstest::rstest;
401 use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
402 use tycho_simulation::{
403 tycho_common::models::Chain,
404 tycho_core::{
405 models::{token::Token, Address, Chain as SimChain},
406 Bytes,
407 },
408 };
409
410 use super::*;
411 use crate::{
412 algorithm::test_utils::{component, MockProtocolSim},
413 types::internal::SolveTask,
414 EncodingOptions, OrderSide, Route, SingleOrderQuote, Swap,
415 };
416
417 fn default_encoder() -> Encoder {
418 let registry = SwapEncoderRegistry::new(Chain::Ethereum)
419 .add_default_encoders(None)
420 .expect("default encoders should always succeed");
421 Encoder::new(Chain::Ethereum, registry).expect("encoder creation should succeed")
422 }
423
424 fn make_address(byte: u8) -> Address {
425 Address::from([byte; 20])
426 }
427
428 fn make_order() -> Order {
429 Order::new(
430 make_address(0x01),
431 make_address(0x02),
432 BigUint::from(1000u64),
433 OrderSide::Sell,
434 make_address(0xAA),
435 )
436 .with_id("test-order".to_string())
437 }
438
439 fn make_single_quote(amount_out_net_gas: u64) -> SingleOrderQuote {
440 let make_token = |addr: Address| Token {
441 address: addr,
442 symbol: "T".to_string(),
443 decimals: 18,
444 tax: Default::default(),
445 gas: vec![],
446 chain: SimChain::Ethereum,
447 quality: 100,
448 };
449 let tin = make_address(0x01);
450 let tout = make_address(0x02);
451 let swap = Swap::new(
452 "pool-1".to_string(),
453 "uniswap_v2".to_string(),
454 tin.clone(),
455 tout.clone(),
456 BigUint::from(1000u64),
457 BigUint::from(990u64),
458 BigUint::from(50_000u64),
459 component(
460 "0x0000000000000000000000000000000000000001",
461 &[make_token(tin), make_token(tout)],
462 ),
463 Box::new(MockProtocolSim::default()),
464 );
465 let quote = OrderQuote::new(
466 "test-order".to_string(),
467 QuoteStatus::Success,
468 BigUint::from(1000u64),
469 BigUint::from(990u64),
470 BigUint::from(100_000u64),
471 BigUint::from(amount_out_net_gas),
472 BlockInfo::new(1, "0x123".to_string(), 1000),
473 "test".to_string(),
474 Bytes::from(make_address(0xAA).as_ref()),
475 Bytes::from(make_address(0xAA).as_ref()),
476 )
477 .with_route(Route::new(vec![swap]));
478 SingleOrderQuote::new(quote, 5)
479 }
480
481 fn create_mock_pool(
483 name: &str,
484 response: Result<SingleOrderQuote, SolveError>,
485 delay_ms: u64,
486 ) -> (SolverPoolHandle, tokio::task::JoinHandle<()>) {
487 let (tx, rx) = async_channel::bounded::<SolveTask>(10);
488 let handle = TaskQueueHandle::from_sender(tx);
489
490 let worker = tokio::spawn(async move {
491 while let Ok(task) = rx.recv().await {
492 if delay_ms > 0 {
493 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
494 }
495 task.respond(response.clone());
496 }
497 });
498
499 (SolverPoolHandle::new(name, handle), worker)
500 }
501
502 #[test]
503 fn test_config_default() {
504 let config = WorkerPoolRouterConfig::default();
505 assert_eq!(config.default_timeout(), Duration::from_secs(1));
506 assert_eq!(config.min_responses(), 1);
507 }
508
509 #[test]
510 fn test_config_builder() {
511 let config = WorkerPoolRouterConfig::default()
512 .with_timeout(Duration::from_millis(500))
513 .with_min_responses(2);
514 assert_eq!(config.default_timeout(), Duration::from_millis(500));
515 assert_eq!(config.min_responses(), 2);
516 }
517
518 #[tokio::test]
519 async fn test_router_no_pools() {
520 let worker_router =
521 WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
522 let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
523
524 let result = worker_router.quote(request).await;
525 assert!(matches!(result, Err(SolveError::Internal(_))));
526 }
527
528 #[tokio::test]
529 async fn test_router_single_pool_success() {
530 let (pool, worker) = create_mock_pool("pool_a", Ok(make_single_quote(900)), 0);
531
532 let worker_router =
533 WorkerPoolRouter::new(vec![pool], WorkerPoolRouterConfig::default(), default_encoder());
534 let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
535 let request = QuoteRequest::new(vec![make_order()], options);
536
537 let result = worker_router.quote(request).await;
538 assert!(result.is_ok());
539
540 let quote = result.unwrap();
541 assert_eq!(quote.orders().len(), 1);
542 assert_eq!(quote.orders()[0].status(), QuoteStatus::Success);
543 assert_eq!(*quote.orders()[0].amount_out_net_gas(), BigUint::from(900u64));
544 assert!(!quote.orders()[0]
545 .transaction()
546 .unwrap()
547 .data()
548 .is_empty());
549
550 drop(worker_router);
551 worker.abort();
552 }
553
554 #[tokio::test]
555 async fn test_router_selects_best_of_two() {
556 let (pool_a, worker_a) = create_mock_pool("pool_a", Ok(make_single_quote(800)), 0);
558 let (pool_b, worker_b) = create_mock_pool("pool_b", Ok(make_single_quote(950)), 0);
560
561 let config = WorkerPoolRouterConfig::default().with_min_responses(2);
563 let worker_router = WorkerPoolRouter::new(vec![pool_a, pool_b], config, default_encoder());
564 let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
565 let request = QuoteRequest::new(vec![make_order()], options);
566
567 let result = worker_router.quote(request).await;
568 assert!(result.is_ok());
569
570 let quote = result.unwrap();
571 assert_eq!(quote.orders().len(), 1);
572 assert_eq!(*quote.orders()[0].amount_out_net_gas(), BigUint::from(950u64));
574 assert!(!quote.orders()[0]
575 .transaction()
576 .unwrap()
577 .data()
578 .is_empty());
579
580 drop(worker_router);
581 worker_a.abort();
582 worker_b.abort();
583 }
584
585 #[tokio::test]
586 async fn test_router_timeout() {
587 let (pool, worker) = create_mock_pool("slow_pool", Ok(make_single_quote(900)), 500);
589
590 let config = WorkerPoolRouterConfig::default().with_timeout(Duration::from_millis(50));
591 let worker_router = WorkerPoolRouter::new(vec![pool], config, default_encoder());
592 let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
593
594 let result = worker_router.quote(request).await;
595 assert!(result.is_ok());
596
597 let quote = result.unwrap();
598 assert_eq!(quote.orders().len(), 1);
600 assert!(matches!(
601 quote.orders()[0].status(),
602 QuoteStatus::Timeout | QuoteStatus::NoRouteFound
603 ));
604
605 drop(worker_router);
606 worker.abort();
607 }
608
609 #[tokio::test]
610 async fn test_router_early_return_on_min_responses() {
611 let (pool_a, worker_a) = create_mock_pool("fast_pool", Ok(make_single_quote(800)), 0);
613 let (pool_b, worker_b) = create_mock_pool("slow_pool", Ok(make_single_quote(950)), 500);
615
616 let config = WorkerPoolRouterConfig::default()
617 .with_timeout(Duration::from_millis(1000))
618 .with_min_responses(1);
619 let worker_router = WorkerPoolRouter::new(vec![pool_a, pool_b], config, default_encoder());
620
621 let start = Instant::now();
622 let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
623 let request = QuoteRequest::new(vec![make_order()], options);
624
625 let result = worker_router.quote(request).await;
626 let elapsed = start.elapsed();
627
628 assert!(result.is_ok());
629 assert!(elapsed < Duration::from_millis(200));
631
632 let quote = result.unwrap();
634 assert_eq!(quote.orders().len(), 1);
635 assert_eq!(quote.orders()[0].status(), QuoteStatus::Success);
636 assert!(!quote.orders()[0]
638 .transaction()
639 .unwrap()
640 .data()
641 .is_empty());
642
643 drop(worker_router);
644 worker_a.abort();
645 worker_b.abort();
646 }
647
648 #[rstest]
649 #[case::under_limit(100, Some(200), true)]
650 #[case::at_limit(200, Some(200), true)]
651 #[case::over_limit(300, Some(200), false)]
652 #[case::no_limit(500, None, true)]
653 fn test_max_gas_constraint(
654 #[case] gas_estimate: u64,
655 #[case] max_gas: Option<u64>,
656 #[case] should_pass: bool,
657 ) {
658 let responses = OrderResponses {
659 order_id: "test".to_string(),
660 quotes: vec![(
661 "pool".to_string(),
662 OrderQuote::new(
663 "test".to_string(),
664 QuoteStatus::Success,
665 BigUint::from(1000u64),
666 BigUint::from(990u64),
667 BigUint::from(gas_estimate),
668 BigUint::from(900u64),
669 BlockInfo::new(1, "0x123".to_string(), 1000),
670 "test".to_string(),
671 Bytes::from(make_address(0xAA).as_ref()),
672 Bytes::from(make_address(0xAA).as_ref()),
673 ),
674 )],
675 failed_solvers: vec![],
676 };
677
678 let options = match max_gas {
679 Some(gas) => QuoteOptions::default().with_max_gas(BigUint::from(gas)),
680 None => QuoteOptions::default(),
681 };
682
683 let worker_router =
684 WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
685 let result = worker_router.select_best(&responses, &options);
686
687 if should_pass {
688 assert_eq!(result.status(), QuoteStatus::Success);
689 } else {
690 assert_eq!(result.status(), QuoteStatus::NoRouteFound);
691 }
692 }
693
694 #[tokio::test]
695 async fn test_router_captures_solver_errors() {
696 let (pool, worker) = create_mock_pool(
698 "error_pool",
699 Err(SolveError::NoRouteFound { order_id: "test-order".to_string() }),
700 0,
701 );
702
703 let worker_router =
704 WorkerPoolRouter::new(vec![pool], WorkerPoolRouterConfig::default(), default_encoder());
705 let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
706
707 let result = worker_router.quote(request).await;
708 assert!(result.is_ok());
709
710 let quote = result.unwrap();
711 assert_eq!(quote.orders().len(), 1);
712 assert_eq!(quote.orders()[0].status(), QuoteStatus::NoRouteFound);
714
715 drop(worker_router);
716 worker.abort();
717 }
718
719 #[test]
720 fn test_select_best_all_timeouts_returns_timeout_status() {
721 let responses = OrderResponses {
722 order_id: "test".to_string(),
723 quotes: vec![],
724 failed_solvers: vec![
725 ("pool_a".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
726 ("pool_b".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
727 ],
728 };
729
730 let worker_router =
731 WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
732 let result = worker_router.select_best(&responses, &QuoteOptions::default());
733
734 assert_eq!(result.status(), QuoteStatus::Timeout);
735 }
736
737 #[test]
738 fn test_select_best_mixed_failures_returns_no_route_found() {
739 let responses = OrderResponses {
740 order_id: "test".to_string(),
741 quotes: vec![],
742 failed_solvers: vec![
743 ("pool_a".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
744 ("pool_b".to_string(), SolveError::NoRouteFound { order_id: "test".to_string() }),
745 ],
746 };
747
748 let worker_router =
749 WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
750 let result = worker_router.select_best(&responses, &QuoteOptions::default());
751
752 assert_eq!(result.status(), QuoteStatus::NoRouteFound);
754 }
755
756 #[test]
757 fn test_select_best_no_failures_returns_no_route_found() {
758 let responses =
759 OrderResponses { order_id: "test".to_string(), quotes: vec![], failed_solvers: vec![] };
760
761 let worker_router =
762 WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
763 let result = worker_router.select_best(&responses, &QuoteOptions::default());
764
765 assert_eq!(result.status(), QuoteStatus::NoRouteFound);
767 }
768}