pub mod config;
use std::{
collections::HashSet,
time::{Duration, Instant},
};
use config::WorkerPoolRouterConfig;
use futures::stream::{FuturesUnordered, StreamExt};
use metrics::{counter, histogram};
use num_bigint::BigUint;
use tracing::{debug, warn};
use tycho_simulation::tycho_common::Bytes;
use crate::{
encoding::encoder::Encoder, worker_pool::task_queue::TaskQueueHandle, BlockInfo, Order,
OrderQuote, Quote, QuoteOptions, QuoteRequest, QuoteStatus, SolveError,
};
#[derive(Clone)]
pub struct SolverPoolHandle {
name: String,
queue: TaskQueueHandle,
}
impl SolverPoolHandle {
pub fn new(name: impl Into<String>, queue: TaskQueueHandle) -> Self {
Self { name: name.into(), queue }
}
pub fn name(&self) -> &str {
&self.name
}
pub fn queue(&self) -> &TaskQueueHandle {
&self.queue
}
}
#[derive(Debug)]
pub(crate) struct OrderResponses {
order_id: String,
quotes: Vec<(String, OrderQuote)>,
failed_solvers: Vec<(String, SolveError)>,
}
pub struct WorkerPoolRouter {
solver_pools: Vec<SolverPoolHandle>,
config: WorkerPoolRouterConfig,
encoder: Encoder,
}
impl WorkerPoolRouter {
pub fn new(
solver_pools: Vec<SolverPoolHandle>,
config: WorkerPoolRouterConfig,
encoder: Encoder,
) -> Self {
Self { solver_pools, config, encoder }
}
pub fn num_pools(&self) -> usize {
self.solver_pools.len()
}
pub async fn quote(&self, request: QuoteRequest) -> Result<Quote, SolveError> {
let start = Instant::now();
let deadline = start + self.effective_timeout(request.options());
let min_responses = request
.options()
.min_responses()
.unwrap_or(self.config.min_responses());
if self.solver_pools.is_empty() {
return Err(SolveError::Internal("no solver pools configured".to_string()));
}
let order_futures: Vec<_> = request
.orders()
.iter()
.map(|order| self.solve_order(order.clone(), deadline, min_responses))
.collect();
let order_responses = futures::future::join_all(order_futures).await;
let mut order_quotes: Vec<OrderQuote> = order_responses
.into_iter()
.map(|responses| self.select_best(&responses, request.options()))
.collect();
if let Some(encoding_options) = request.options().encoding_options() {
order_quotes = self
.encoder
.encode(order_quotes, encoding_options.clone())
.await?;
}
let total_gas_estimate = order_quotes
.iter()
.map(|o| o.gas_estimate())
.fold(BigUint::ZERO, |acc, g| acc + g);
let solve_time_ms = start.elapsed().as_millis() as u64;
Ok(Quote::new(order_quotes, total_gas_estimate, solve_time_ms))
}
async fn solve_order(
&self,
order: Order,
deadline: Instant,
min_responses: usize,
) -> OrderResponses {
let start_time = Instant::now();
let order_id = order.id().to_string();
let mut pending: FuturesUnordered<_> = self
.solver_pools
.iter()
.map(|pool| {
let order_clone = order.clone();
let pool_name = pool.name().to_string();
let queue = pool.queue().clone();
async move {
let result = queue.enqueue(order_clone).await;
(pool_name, result)
}
})
.collect();
let mut quotes = Vec::new();
let mut failed_solvers: Vec<(String, SolveError)> = Vec::new();
let mut remaining_pools: HashSet<String> = self
.solver_pools
.iter()
.map(|p| p.name().to_string())
.collect();
loop {
let deadline_instant = tokio::time::Instant::from_std(deadline);
tokio::select! {
biased;
_ = tokio::time::sleep_until(deadline_instant) => {
let elapsed_ms = deadline.saturating_duration_since(Instant::now())
.as_millis() as u64;
for pool_name in remaining_pools.drain() {
failed_solvers.push((
pool_name,
SolveError::Timeout { elapsed_ms },
));
}
break;
}
result = pending.next() => {
match result {
Some((pool_name, Ok(single_quote))) => {
remaining_pools.remove(&pool_name);
quotes.push((pool_name.clone(), single_quote.order().clone()));
if min_responses > 0 && quotes.len() >= min_responses {
debug!(
order_id = %order_id,
responses = quotes.len(),
min_responses,
"early return: min_responses reached"
);
counter!("worker_router_early_returns_total").increment(1);
break;
}
}
Some((pool_name, Err(e))) => {
remaining_pools.remove(&pool_name);
warn!(
pool = %pool_name,
order_id = %order_id,
error = %e,
"solver pool failed"
);
failed_solvers.push((pool_name, e));
}
None => {
break;
}
}
}
}
}
let duration = start_time.elapsed().as_secs_f64();
histogram!("worker_router_solve_duration_seconds").record(duration);
histogram!("worker_router_solver_responses").record(quotes.len() as f64);
for (pool_name, error) in &failed_solvers {
let error_type = match error {
SolveError::Timeout { .. } => "timeout",
SolveError::NoRouteFound { .. } => "no_route",
SolveError::QueueFull => "queue_full",
SolveError::Internal(_) => "internal",
_ => "other",
};
counter!("worker_router_solver_failures_total", "pool" => pool_name.clone(), "error_type" => error_type).increment(1);
}
if !failed_solvers.is_empty() {
let timeout_count = failed_solvers
.iter()
.filter(|(_, e)| matches!(e, SolveError::Timeout { .. }))
.count();
let other_count = failed_solvers.len() - timeout_count;
warn!(
order_id = %order_id,
timeout_count,
other_failures = other_count,
"some solver pools failed"
);
}
OrderResponses { order_id, quotes, failed_solvers }
}
fn select_best(&self, responses: &OrderResponses, options: &QuoteOptions) -> OrderQuote {
let valid_quotes: Vec<_> = responses
.quotes
.iter()
.filter(|(_, q)| q.status() == QuoteStatus::Success)
.filter(|(_, q)| {
options
.max_gas()
.map(|max| q.gas_estimate() <= max)
.unwrap_or(true)
})
.collect();
if let Some((pool_name, best)) = valid_quotes
.into_iter()
.max_by_key(|(_, q)| q.amount_out_net_gas())
{
counter!("worker_router_orders_total", "status" => "success").increment(1);
counter!("worker_router_best_quote_pool", "pool" => pool_name.clone()).increment(1);
debug!(
order_id = %best.order_id(),
pool = %pool_name,
amount_out_net_gas = %best.amount_out_net_gas(),
"selected best quote"
);
return best.clone();
}
if let Some((_, any_q)) = responses.quotes.first() {
counter!("worker_router_orders_total", "status" => "no_route").increment(1);
OrderQuote::new(
responses.order_id.clone(),
QuoteStatus::NoRouteFound,
any_q.amount_in().clone(),
BigUint::ZERO,
BigUint::ZERO,
BigUint::ZERO,
any_q.block().clone(),
String::new(),
any_q.sender().clone(),
any_q.receiver().clone(),
)
} else {
let status = if responses.failed_solvers.is_empty() {
QuoteStatus::NoRouteFound
} else {
let all_timeouts = responses
.failed_solvers
.iter()
.all(|(_, e)| matches!(e, SolveError::Timeout { .. }));
let all_not_ready = responses
.failed_solvers
.iter()
.all(|(_, e)| matches!(e, SolveError::NotReady(_)));
if all_timeouts {
QuoteStatus::Timeout
} else if all_not_ready {
QuoteStatus::NotReady
} else {
QuoteStatus::NoRouteFound
}
};
let status_label = match status {
QuoteStatus::Timeout => "timeout",
QuoteStatus::NotReady => "not_ready",
_ => "no_route",
};
counter!("worker_router_orders_total", "status" => status_label).increment(1);
OrderQuote::new(
responses.order_id.clone(),
status,
BigUint::ZERO,
BigUint::ZERO,
BigUint::ZERO,
BigUint::ZERO,
BlockInfo::new(0, String::new(), 0),
String::new(),
Bytes::default(),
Bytes::default(),
)
}
}
fn effective_timeout(&self, options: &QuoteOptions) -> Duration {
options
.timeout_ms()
.map(Duration::from_millis)
.unwrap_or(self.config.default_timeout())
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry;
use tycho_simulation::{
tycho_common::models::Chain,
tycho_core::{
models::{token::Token, Address, Chain as SimChain},
Bytes,
},
};
use super::*;
use crate::{
algorithm::test_utils::{component, MockProtocolSim},
types::internal::SolveTask,
EncodingOptions, OrderSide, Route, SingleOrderQuote, Swap,
};
fn default_encoder() -> Encoder {
let registry = SwapEncoderRegistry::new(Chain::Ethereum)
.add_default_encoders(None)
.expect("default encoders should always succeed");
Encoder::new(Chain::Ethereum, registry).expect("encoder creation should succeed")
}
fn make_address(byte: u8) -> Address {
Address::from([byte; 20])
}
fn make_order() -> Order {
Order::new(
make_address(0x01),
make_address(0x02),
BigUint::from(1000u64),
OrderSide::Sell,
make_address(0xAA),
)
.with_id("test-order".to_string())
}
fn make_single_quote(amount_out_net_gas: u64) -> SingleOrderQuote {
let make_token = |addr: Address| Token {
address: addr,
symbol: "T".to_string(),
decimals: 18,
tax: Default::default(),
gas: vec![],
chain: SimChain::Ethereum,
quality: 100,
};
let tin = make_address(0x01);
let tout = make_address(0x02);
let swap = Swap::new(
"pool-1".to_string(),
"uniswap_v2".to_string(),
tin.clone(),
tout.clone(),
BigUint::from(1000u64),
BigUint::from(990u64),
BigUint::from(50_000u64),
component(
"0x0000000000000000000000000000000000000001",
&[make_token(tin), make_token(tout)],
),
Box::new(MockProtocolSim::default()),
);
let quote = OrderQuote::new(
"test-order".to_string(),
QuoteStatus::Success,
BigUint::from(1000u64),
BigUint::from(990u64),
BigUint::from(100_000u64),
BigUint::from(amount_out_net_gas),
BlockInfo::new(1, "0x123".to_string(), 1000),
"test".to_string(),
Bytes::from(make_address(0xAA).as_ref()),
Bytes::from(make_address(0xAA).as_ref()),
)
.with_route(Route::new(vec![swap]));
SingleOrderQuote::new(quote, 5)
}
fn create_mock_pool(
name: &str,
response: Result<SingleOrderQuote, SolveError>,
delay_ms: u64,
) -> (SolverPoolHandle, tokio::task::JoinHandle<()>) {
let (tx, rx) = async_channel::bounded::<SolveTask>(10);
let handle = TaskQueueHandle::from_sender(tx);
let worker = tokio::spawn(async move {
while let Ok(task) = rx.recv().await {
if delay_ms > 0 {
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
task.respond(response.clone());
}
});
(SolverPoolHandle::new(name, handle), worker)
}
#[test]
fn test_config_default() {
let config = WorkerPoolRouterConfig::default();
assert_eq!(config.default_timeout(), Duration::from_secs(1));
assert_eq!(config.min_responses(), 1);
}
#[test]
fn test_config_builder() {
let config = WorkerPoolRouterConfig::default()
.with_timeout(Duration::from_millis(500))
.with_min_responses(2);
assert_eq!(config.default_timeout(), Duration::from_millis(500));
assert_eq!(config.min_responses(), 2);
}
#[tokio::test]
async fn test_router_no_pools() {
let worker_router =
WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
let result = worker_router.quote(request).await;
assert!(matches!(result, Err(SolveError::Internal(_))));
}
#[tokio::test]
async fn test_router_single_pool_success() {
let (pool, worker) = create_mock_pool("pool_a", Ok(make_single_quote(900)), 0);
let worker_router =
WorkerPoolRouter::new(vec![pool], WorkerPoolRouterConfig::default(), default_encoder());
let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
let request = QuoteRequest::new(vec![make_order()], options);
let result = worker_router.quote(request).await;
assert!(result.is_ok());
let quote = result.unwrap();
assert_eq!(quote.orders().len(), 1);
assert_eq!(quote.orders()[0].status(), QuoteStatus::Success);
assert_eq!(*quote.orders()[0].amount_out_net_gas(), BigUint::from(900u64));
assert!(!quote.orders()[0]
.transaction()
.unwrap()
.data()
.is_empty());
drop(worker_router);
worker.abort();
}
#[tokio::test]
async fn test_router_selects_best_of_two() {
let (pool_a, worker_a) = create_mock_pool("pool_a", Ok(make_single_quote(800)), 0);
let (pool_b, worker_b) = create_mock_pool("pool_b", Ok(make_single_quote(950)), 0);
let config = WorkerPoolRouterConfig::default().with_min_responses(2);
let worker_router = WorkerPoolRouter::new(vec![pool_a, pool_b], config, default_encoder());
let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
let request = QuoteRequest::new(vec![make_order()], options);
let result = worker_router.quote(request).await;
assert!(result.is_ok());
let quote = result.unwrap();
assert_eq!(quote.orders().len(), 1);
assert_eq!(*quote.orders()[0].amount_out_net_gas(), BigUint::from(950u64));
assert!(!quote.orders()[0]
.transaction()
.unwrap()
.data()
.is_empty());
drop(worker_router);
worker_a.abort();
worker_b.abort();
}
#[tokio::test]
async fn test_router_timeout() {
let (pool, worker) = create_mock_pool("slow_pool", Ok(make_single_quote(900)), 500);
let config = WorkerPoolRouterConfig::default().with_timeout(Duration::from_millis(50));
let worker_router = WorkerPoolRouter::new(vec![pool], config, default_encoder());
let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
let result = worker_router.quote(request).await;
assert!(result.is_ok());
let quote = result.unwrap();
assert_eq!(quote.orders().len(), 1);
assert!(matches!(
quote.orders()[0].status(),
QuoteStatus::Timeout | QuoteStatus::NoRouteFound
));
drop(worker_router);
worker.abort();
}
#[tokio::test]
async fn test_router_early_return_on_min_responses() {
let (pool_a, worker_a) = create_mock_pool("fast_pool", Ok(make_single_quote(800)), 0);
let (pool_b, worker_b) = create_mock_pool("slow_pool", Ok(make_single_quote(950)), 500);
let config = WorkerPoolRouterConfig::default()
.with_timeout(Duration::from_millis(1000))
.with_min_responses(1);
let worker_router = WorkerPoolRouter::new(vec![pool_a, pool_b], config, default_encoder());
let start = Instant::now();
let options = QuoteOptions::default().with_encoding_options(EncodingOptions::new(0.01));
let request = QuoteRequest::new(vec![make_order()], options);
let result = worker_router.quote(request).await;
let elapsed = start.elapsed();
assert!(result.is_ok());
assert!(elapsed < Duration::from_millis(200));
let quote = result.unwrap();
assert_eq!(quote.orders().len(), 1);
assert_eq!(quote.orders()[0].status(), QuoteStatus::Success);
assert!(!quote.orders()[0]
.transaction()
.unwrap()
.data()
.is_empty());
drop(worker_router);
worker_a.abort();
worker_b.abort();
}
#[rstest]
#[case::under_limit(100, Some(200), true)]
#[case::at_limit(200, Some(200), true)]
#[case::over_limit(300, Some(200), false)]
#[case::no_limit(500, None, true)]
fn test_max_gas_constraint(
#[case] gas_estimate: u64,
#[case] max_gas: Option<u64>,
#[case] should_pass: bool,
) {
let responses = OrderResponses {
order_id: "test".to_string(),
quotes: vec![(
"pool".to_string(),
OrderQuote::new(
"test".to_string(),
QuoteStatus::Success,
BigUint::from(1000u64),
BigUint::from(990u64),
BigUint::from(gas_estimate),
BigUint::from(900u64),
BlockInfo::new(1, "0x123".to_string(), 1000),
"test".to_string(),
Bytes::from(make_address(0xAA).as_ref()),
Bytes::from(make_address(0xAA).as_ref()),
),
)],
failed_solvers: vec![],
};
let options = match max_gas {
Some(gas) => QuoteOptions::default().with_max_gas(BigUint::from(gas)),
None => QuoteOptions::default(),
};
let worker_router =
WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
let result = worker_router.select_best(&responses, &options);
if should_pass {
assert_eq!(result.status(), QuoteStatus::Success);
} else {
assert_eq!(result.status(), QuoteStatus::NoRouteFound);
}
}
#[tokio::test]
async fn test_router_captures_solver_errors() {
let (pool, worker) = create_mock_pool(
"error_pool",
Err(SolveError::NoRouteFound { order_id: "test-order".to_string() }),
0,
);
let worker_router =
WorkerPoolRouter::new(vec![pool], WorkerPoolRouterConfig::default(), default_encoder());
let request = QuoteRequest::new(vec![make_order()], QuoteOptions::default());
let result = worker_router.quote(request).await;
assert!(result.is_ok());
let quote = result.unwrap();
assert_eq!(quote.orders().len(), 1);
assert_eq!(quote.orders()[0].status(), QuoteStatus::NoRouteFound);
drop(worker_router);
worker.abort();
}
#[test]
fn test_select_best_all_timeouts_returns_timeout_status() {
let responses = OrderResponses {
order_id: "test".to_string(),
quotes: vec![],
failed_solvers: vec![
("pool_a".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
("pool_b".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
],
};
let worker_router =
WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
let result = worker_router.select_best(&responses, &QuoteOptions::default());
assert_eq!(result.status(), QuoteStatus::Timeout);
}
#[test]
fn test_select_best_mixed_failures_returns_no_route_found() {
let responses = OrderResponses {
order_id: "test".to_string(),
quotes: vec![],
failed_solvers: vec![
("pool_a".to_string(), SolveError::Timeout { elapsed_ms: 100 }),
("pool_b".to_string(), SolveError::NoRouteFound { order_id: "test".to_string() }),
],
};
let worker_router =
WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
let result = worker_router.select_best(&responses, &QuoteOptions::default());
assert_eq!(result.status(), QuoteStatus::NoRouteFound);
}
#[test]
fn test_select_best_no_failures_returns_no_route_found() {
let responses =
OrderResponses { order_id: "test".to_string(), quotes: vec![], failed_solvers: vec![] };
let worker_router =
WorkerPoolRouter::new(vec![], WorkerPoolRouterConfig::default(), default_encoder());
let result = worker_router.select_best(&responses, &QuoteOptions::default());
assert_eq!(result.status(), QuoteStatus::NoRouteFound);
}
}