1pub mod config;
19
20use std::{
21 collections::HashSet,
22 time::{Duration, Instant},
23};
24
25use config::WorkerPoolRouterConfig;
26use futures::stream::{FuturesUnordered, StreamExt};
27use metrics::{counter, histogram};
28use num_bigint::BigUint;
29use tracing::{debug, warn};
30use tycho_simulation::tycho_common::Bytes;
31
32use crate::{
33 encoding::encoder::Encoder, worker_pool::task_queue::TaskQueueHandle, BlockInfo, Order,
34 OrderQuote, Quote, QuoteOptions, QuoteRequest, QuoteStatus, SolveError,
35};
36
37#[derive(Clone)]
39pub struct SolverPoolHandle {
40 name: String,
42 queue: TaskQueueHandle,
44}
45
46impl SolverPoolHandle {
47 pub fn new(name: impl Into<String>, queue: TaskQueueHandle) -> Self {
49 Self { name: name.into(), queue }
50 }
51
52 pub fn name(&self) -> &str {
54 &self.name
55 }
56
57 pub fn queue(&self) -> &TaskQueueHandle {
59 &self.queue
60 }
61}
62
63#[derive(Debug)]
65pub(crate) struct OrderResponses {
66 order_id: String,
68 quotes: Vec<(String, OrderQuote)>,
70 failed_solvers: Vec<(String, SolveError)>,
73}
74
75pub struct WorkerPoolRouter {
77 solver_pools: Vec<SolverPoolHandle>,
79 config: WorkerPoolRouterConfig,
81 encoder: Encoder,
83}
84
85impl WorkerPoolRouter {
86 pub fn new(
88 solver_pools: Vec<SolverPoolHandle>,
89 config: WorkerPoolRouterConfig,
90 encoder: Encoder,
91 ) -> Self {
92 Self { solver_pools, config, encoder }
93 }
94
95 pub fn num_pools(&self) -> usize {
97 self.solver_pools.len()
98 }
99
100 pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
109 let start = Instant::now();
110 let deadline = start + self.effective_timeout(request.options());
111 let min_responses = request
112 .options()
113 .min_responses()
114 .unwrap_or(self.config.min_responses());
115
116 if self.solver_pools.is_empty() {
117 return Err(SolveError::Internal("no solver pools configured".to_string()));
118 }
119
120 let order_futures: Vec<_> = request
122 .orders()
123 .iter()
124 .map(|order| self.solve_order(order.clone(), deadline, min_responses))
125 .collect();
126
127 let order_responses = futures::future::join_all(order_futures).await;
128
129 let mut order_quotes: Vec<OrderQuote> = order_responses
131 .into_iter()
132 .map(|responses| self.select_best(&responses, request.options()))
133 .collect();
134
135 if let Some(encoding_options) = request.options().encoding_options() {
137 order_quotes = self
138 .encoder
139 .encode(order_quotes, encoding_options.clone())
140 .await?;
141 }
142
143 let total_gas_estimate = order_quotes
145 .iter()
146 .map(|o| o.gas_estimate())
147 .fold(BigUint::ZERO, |acc, g| acc + g);
148
149 let solve_time_ms = start.elapsed().as_millis() as u64;
150
151 Ok(Quote::new(order_quotes, total_gas_estimate, solve_time_ms))
152 }
153
154 async fn solve_order(
156 &self,
157 order: Order,
158 deadline: Instant,
159 min_responses: usize,
160 ) -> OrderResponses {
161 let start_time = Instant::now();
162 let order_id = order.id().to_string();
163
164 let mut pending: FuturesUnordered<_> = self
168 .solver_pools
169 .iter()
170 .map(|pool| {
171 let order_clone = order.clone();
172 let pool_name = pool.name().to_string();
173 let queue = pool.queue().clone();
174
175 async move {
176 let result = queue.enqueue(order_clone).await;
177 (pool_name, result)
178 }
179 })
180 .collect();
181
182 let mut quotes = Vec::new();
183 let mut failed_solvers: Vec<(String, SolveError)> = Vec::new();
184 let mut remaining_pools: HashSet<String> = self
185 .solver_pools
186 .iter()
187 .map(|p| p.name().to_string())
188 .collect();
189
190 loop {
192 let deadline_instant = tokio::time::Instant::from_std(deadline);
193
194 tokio::select! {
195 biased;
197
198 _ = tokio::time::sleep_until(deadline_instant) => {
200 let elapsed_ms = deadline.saturating_duration_since(Instant::now())
202 .as_millis() as u64;
203 for pool_name in remaining_pools.drain() {
204 failed_solvers.push((
205 pool_name,
206 SolveError::Timeout { elapsed_ms },
207 ));
208 }
209 break;
210 }
211
212 result = pending.next() => {
214 match result {
215 Some((pool_name, Ok(single_quote))) => {
216 remaining_pools.remove(&pool_name);
218
219 quotes.push((pool_name.clone(), single_quote.order().clone()));
221
222 if min_responses > 0 && quotes.len() >= min_responses {
224 debug!(
225 order_id = %order_id,
226 responses = quotes.len(),
227 min_responses,
228 "early return: min_responses reached"
229 );
230 counter!("worker_router_early_returns_total").increment(1);
231 break;
232 }
233 }
234 Some((pool_name, Err(e))) => {
235 remaining_pools.remove(&pool_name);
236 warn!(
237 pool = %pool_name,
238 order_id = %order_id,
239 error = %e,
240 "solver pool failed"
241 );
242 failed_solvers.push((pool_name, e));
243 }
244 None => {
245 break;
247 }
248 }
249 }
250 }
251 }
252
253 let duration = start_time.elapsed().as_secs_f64();
255 histogram!("worker_router_solve_duration_seconds").record(duration);
256 histogram!("worker_router_solver_responses").record(quotes.len() as f64);
257
258 for (pool_name, error) in &failed_solvers {
260 let error_type = match error {
261 SolveError::Timeout { .. } => "timeout",
262 SolveError::NoRouteFound { .. } => "no_route",
263 SolveError::QueueFull => "queue_full",
264 SolveError::Internal(_) => "internal",
265 _ => "other",
266 };
267 counter!("worker_router_solver_failures_total", "pool" => pool_name.clone(), "error_type" => error_type).increment(1);
268 }
269
270 if !failed_solvers.is_empty() {
271 let timeout_count = failed_solvers
272 .iter()
273 .filter(|(_, e)| matches!(e, SolveError::Timeout { .. }))
274 .count();
275 let other_count = failed_solvers.len() - timeout_count;
276 warn!(
277 order_id = %order_id,
278 timeout_count,
279 other_failures = other_count,
280 "some solver pools failed"
281 );
282 }
283
284 OrderResponses { order_id, quotes, failed_solvers }
285 }
286
287 fn select_best(&self, responses: &OrderResponses, options: &QuoteOptions) -> OrderQuote {
293 let valid_quotes: Vec<_> = responses
294 .quotes
295 .iter()
296 .filter(|(_, q)| q.status() == QuoteStatus::Success)
298 .filter(|(_, q)| {
300 options
301 .max_gas()
302 .map(|max| q.gas_estimate() <= max)
303 .unwrap_or(true)
304 })
305 .collect();
306
307 if let Some((pool_name, best)) = valid_quotes
309 .into_iter()
310 .max_by_key(|(_, q)| q.amount_out_net_gas())
311 {
312 counter!("worker_router_orders_total", "status" => "success").increment(1);
314 counter!("worker_router_best_quote_pool", "pool" => pool_name.clone()).increment(1);
315
316 debug!(
317 order_id = %best.order_id(),
318 pool = %pool_name,
319 amount_out_net_gas = %best.amount_out_net_gas(),
320 "selected best quote"
321 );
322 return best.clone();
323 }
324
325 if let Some((_, any_q)) = responses.quotes.first() {
328 counter!("worker_router_orders_total", "status" => "no_route").increment(1);
329 OrderQuote::new(
330 responses.order_id.clone(),
331 QuoteStatus::NoRouteFound,
332 any_q.amount_in().clone(),
333 BigUint::ZERO,
334 BigUint::ZERO,
335 BigUint::ZERO,
336 any_q.block().clone(),
337 String::new(),
338 any_q.sender().clone(),
339 any_q.receiver().clone(),
340 )
341 } else {
342 let status = if responses.failed_solvers.is_empty() {
344 QuoteStatus::NoRouteFound
345 } else {
346 let all_timeouts = responses
349 .failed_solvers
350 .iter()
351 .all(|(_, e)| matches!(e, SolveError::Timeout { .. }));
352 let all_not_ready = responses
353 .failed_solvers
354 .iter()
355 .all(|(_, e)| matches!(e, SolveError::NotReady(_)));
356 if all_timeouts {
357 QuoteStatus::Timeout
358 } else if all_not_ready {
359 QuoteStatus::NotReady
360 } else {
361 QuoteStatus::NoRouteFound
362 }
363 };
364
365 let status_label = match status {
367 QuoteStatus::Timeout => "timeout",
368 QuoteStatus::NotReady => "not_ready",
369 _ => "no_route",
370 };
371 counter!("worker_router_orders_total", "status" => status_label).increment(1);
372
373 OrderQuote::new(
374 responses.order_id.clone(),
375 status,
376 BigUint::ZERO,
377 BigUint::ZERO,
378 BigUint::ZERO,
379 BigUint::ZERO,
380 BlockInfo::new(0, String::new(), 0),
381 String::new(),
382 Bytes::default(),
383 Bytes::default(),
384 )
385 }
386 }
387
388 fn effective_timeout(&self, options: &QuoteOptions) -> Duration {
390 options
391 .timeout_ms()
392 .map(Duration::from_millis)
393 .unwrap_or(self.config.default_timeout())
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use rstest::rstest;
400 use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
401 use tycho_simulation::{
402 tycho_common::models::Chain,
403 tycho_core::{
404 models::{token::Token, Address, Chain as SimChain},
405 Bytes,
406 },
407 };
408
409 use super::*;
410 use crate::{
411 algorithm::test_utils::{component, MockProtocolSim},
412 types::internal::SolveTask,
413 EncodingOptions, OrderSide, Route, SingleOrderQuote, Swap,
414 };
415
416 fn default_encoder() -> Encoder {
417 let registry = SwapEncoderRegistry::new(Chain::Ethereum)
418 .add_default_encoders(None)
419 .expect("default encoders should always succeed");
420 Encoder::new(Chain::Ethereum, registry).expect("encoder creation should succeed")
421 }
422
423 fn make_address(byte: u8) -> Address {
424 Address::from([byte; 20])
425 }
426
427 fn make_order() -> Order {
428 Order::new(
429 make_address(0x01),
430 make_address(0x02),
431 BigUint::from(1000u64),
432 OrderSide::Sell,
433 make_address(0xAA),
434 )
435 .with_id("test-order".to_string())
436 }
437
438 fn make_single_quote(amount_out_net_gas: u64) -> SingleOrderQuote {
439 let make_token = |addr: Address| Token {
440 address: addr,
441 symbol: "T".to_string(),
442 decimals: 18,
443 tax: Default::default(),
444 gas: vec![],
445 chain: SimChain::Ethereum,
446 quality: 100,
447 };
448 let tin = make_address(0x01);
449 let tout = make_address(0x02);
450 let swap = Swap::new(
451 "pool-1".to_string(),
452 "uniswap_v2".to_string(),
453 tin.clone(),
454 tout.clone(),
455 BigUint::from(1000u64),
456 BigUint::from(990u64),
457 BigUint::from(50_000u64),
458 component(
459 "0x0000000000000000000000000000000000000001",
460 &[make_token(tin), make_token(tout)],
461 ),
462 Box::new(MockProtocolSim::default()),
463 );
464 let quote = OrderQuote::new(
465 "test-order".to_string(),
466 QuoteStatus::Success,
467 BigUint::from(1000u64),
468 BigUint::from(990u64),
469 BigUint::from(100_000u64),
470 BigUint::from(amount_out_net_gas),
471 BlockInfo::new(1, "0x123".to_string(), 1000),
472 "test".to_string(),
473 Bytes::from(make_address(0xAA).as_ref()),
474 Bytes::from(make_address(0xAA).as_ref()),
475 )
476 .with_route(Route::new(vec![swap]));
477 SingleOrderQuote::new(quote, 5)
478 }
479
480 fn create_mock_pool(
482 name: &str,
483 response: Result<SingleOrderQuote, SolveError>,
484 delay_ms: u64,
485 ) -> (SolverPoolHandle, tokio::task::JoinHandle<()>) {
486 let (tx, rx) = async_channel::bounded::<SolveTask>(10);
487 let handle = TaskQueueHandle::from_sender(tx);
488
489 let worker = tokio::spawn(async move {
490 while let Ok(task) = rx.recv().await {
491 if delay_ms > 0 {
492 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
493 }
494 task.respond(response.clone());
495 }
496 });
497
498 (SolverPoolHandle::new(name, handle), worker)
499 }
500
501 #[test]
502 fn test_config_default() {
503 let config = WorkerPoolRouterConfig::default();
504 assert_eq!(config.default_timeout(), Duration::from_secs(1));
505 assert_eq!(config.min_responses(), 1);
506 }
507
508 #[test]
509 fn test_config_builder() {
510 let config = WorkerPoolRouterConfig::default()
511 .with_timeout(Duration::from_millis(500))
512 .with_min_responses(2);
513 assert_eq!(config.default_timeout(), Duration::from_millis(500));
514 assert_eq!(config.min_responses(), 2);
515 }
516
517 #[tokio::test]
518 async fn test_router_no_pools() {
519 let worker_router =
520 WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
521 let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
522
523 let result = worker_router.quote(request).await;
524 assert!(matches!(result, Err(SolveError::Internal(_))));
525 }
526
527 #[tokio::test]
528 async fn test_router_single_pool_success() {
529 let (pool, worker) = create_mock_pool("pool_a", Ok(make_single_quote(900)), 0);
530
531 let worker_router =
532 WorkerPoolRouter::new(vec![pool], WorkerPoolRouterConfig::default(), default_encoder());
533 let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
534 let request = QuoteRequest::new(vec![make_order()], options);
535
536 let result = worker_router.quote(request).await;
537 assert!(result.is_ok());
538
539 let quote = result.unwrap();
540 assert_eq!(quote.orders().len(), 1);
541 assert_eq!(quote.orders()[0].status(), QuoteStatus::Success);
542 assert_eq!(*quote.orders()[0].amount_out_net_gas(), BigUint::from(900u64));
543 assert!(!quote.orders()[0]
544 .transaction()
545 .unwrap()
546 .data()
547 .is_empty());
548
549 drop(worker_router);
550 worker.abort();
551 }
552
553 #[tokio::test]
554 async fn test_router_selects_best_of_two() {
555 let (pool_a, worker_a) = create_mock_pool("pool_a", Ok(make_single_quote(800)), 0);
557 let (pool_b, worker_b) = create_mock_pool("pool_b", Ok(make_single_quote(950)), 0);
559
560 let config = WorkerPoolRouterConfig::default().with_min_responses(2);
562 let worker_router = WorkerPoolRouter::new(vec![pool_a, pool_b], config, default_encoder());
563 let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
564 let request = QuoteRequest::new(vec![make_order()], options);
565
566 let result = worker_router.quote(request).await;
567 assert!(result.is_ok());
568
569 let quote = result.unwrap();
570 assert_eq!(quote.orders().len(), 1);
571 assert_eq!(*quote.orders()[0].amount_out_net_gas(), BigUint::from(950u64));
573 assert!(!quote.orders()[0]
574 .transaction()
575 .unwrap()
576 .data()
577 .is_empty());
578
579 drop(worker_router);
580 worker_a.abort();
581 worker_b.abort();
582 }
583
584 #[tokio::test]
585 async fn test_router_timeout() {
586 let (pool, worker) = create_mock_pool("slow_pool", Ok(make_single_quote(900)), 500);
588
589 let config = WorkerPoolRouterConfig::default().with_timeout(Duration::from_millis(50));
590 let worker_router = WorkerPoolRouter::new(vec![pool], config, default_encoder());
591 let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
592
593 let result = worker_router.quote(request).await;
594 assert!(result.is_ok());
595
596 let quote = result.unwrap();
597 assert_eq!(quote.orders().len(), 1);
599 assert!(matches!(
600 quote.orders()[0].status(),
601 QuoteStatus::Timeout | QuoteStatus::NoRouteFound
602 ));
603
604 drop(worker_router);
605 worker.abort();
606 }
607
608 #[tokio::test]
609 async fn test_router_early_return_on_min_responses() {
610 let (pool_a, worker_a) = create_mock_pool("fast_pool", Ok(make_single_quote(800)), 0);
612 let (pool_b, worker_b) = create_mock_pool("slow_pool", Ok(make_single_quote(950)), 500);
614
615 let config = WorkerPoolRouterConfig::default()
616 .with_timeout(Duration::from_millis(1000))
617 .with_min_responses(1);
618 let worker_router = WorkerPoolRouter::new(vec![pool_a, pool_b], config, default_encoder());
619
620 let start = Instant::now();
621 let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
622 let request = QuoteRequest::new(vec![make_order()], options);
623
624 let result = worker_router.quote(request).await;
625 let elapsed = start.elapsed();
626
627 assert!(result.is_ok());
628 assert!(elapsed < Duration::from_millis(200));
630
631 let quote = result.unwrap();
633 assert_eq!(quote.orders().len(), 1);
634 assert_eq!(quote.orders()[0].status(), QuoteStatus::Success);
635 assert!(!quote.orders()[0]
637 .transaction()
638 .unwrap()
639 .data()
640 .is_empty());
641
642 drop(worker_router);
643 worker_a.abort();
644 worker_b.abort();
645 }
646
647 #[rstest]
648 #[case::under_limit(100, Some(200), true)]
649 #[case::at_limit(200, Some(200), true)]
650 #[case::over_limit(300, Some(200), false)]
651 #[case::no_limit(500, None, true)]
652 fn test_max_gas_constraint(
653 #[case] gas_estimate: u64,
654 #[case] max_gas: Option<u64>,
655 #[case] should_pass: bool,
656 ) {
657 let responses = OrderResponses {
658 order_id: "test".to_string(),
659 quotes: vec![(
660 "pool".to_string(),
661 OrderQuote::new(
662 "test".to_string(),
663 QuoteStatus::Success,
664 BigUint::from(1000u64),
665 BigUint::from(990u64),
666 BigUint::from(gas_estimate),
667 BigUint::from(900u64),
668 BlockInfo::new(1, "0x123".to_string(), 1000),
669 "test".to_string(),
670 Bytes::from(make_address(0xAA).as_ref()),
671 Bytes::from(make_address(0xAA).as_ref()),
672 ),
673 )],
674 failed_solvers: vec![],
675 };
676
677 let options = match max_gas {
678 Some(gas) => QuoteOptions::default().with_max_gas(BigUint::from(gas)),
679 None => QuoteOptions::default(),
680 };
681
682 let worker_router =
683 WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
684 let result = worker_router.select_best(&responses, &options);
685
686 if should_pass {
687 assert_eq!(result.status(), QuoteStatus::Success);
688 } else {
689 assert_eq!(result.status(), QuoteStatus::NoRouteFound);
690 }
691 }
692
693 #[tokio::test]
694 async fn test_router_captures_solver_errors() {
695 let (pool, worker) = create_mock_pool(
697 "error_pool",
698 Err(SolveError::NoRouteFound { order_id: "test-order".to_string() }),
699 0,
700 );
701
702 let worker_router =
703 WorkerPoolRouter::new(vec![pool], WorkerPoolRouterConfig::default(), default_encoder());
704 let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
705
706 let result = worker_router.quote(request).await;
707 assert!(result.is_ok());
708
709 let quote = result.unwrap();
710 assert_eq!(quote.orders().len(), 1);
711 assert_eq!(quote.orders()[0].status(), QuoteStatus::NoRouteFound);
713
714 drop(worker_router);
715 worker.abort();
716 }
717
718 #[test]
719 fn test_select_best_all_timeouts_returns_timeout_status() {
720 let responses = OrderResponses {
721 order_id: "test".to_string(),
722 quotes: vec![],
723 failed_solvers: vec![
724 ("pool_a".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
725 ("pool_b".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
726 ],
727 };
728
729 let worker_router =
730 WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
731 let result = worker_router.select_best(&responses, &QuoteOptions::default());
732
733 assert_eq!(result.status(), QuoteStatus::Timeout);
734 }
735
736 #[test]
737 fn test_select_best_mixed_failures_returns_no_route_found() {
738 let responses = OrderResponses {
739 order_id: "test".to_string(),
740 quotes: vec![],
741 failed_solvers: vec![
742 ("pool_a".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
743 ("pool_b".to_string(), SolveError::NoRouteFound { order_id: "test".to_string() }),
744 ],
745 };
746
747 let worker_router =
748 WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
749 let result = worker_router.select_best(&responses, &QuoteOptions::default());
750
751 assert_eq!(result.status(), QuoteStatus::NoRouteFound);
753 }
754
755 #[test]
756 fn test_select_best_no_failures_returns_no_route_found() {
757 let responses =
758 OrderResponses { order_id: "test".to_string(), quotes: vec![], failed_solvers: vec![] };
759
760 let worker_router =
761 WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
762 let result = worker_router.select_best(&responses, &QuoteOptions::default());
763
764 assert_eq!(result.status(), QuoteStatus::NoRouteFound);
766 }
767}