#![cfg(feature = "provider")]
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use alloy_chains::NamedChain;
use alloy_provider::Provider;
use semioscan::provider::{ChainEndpoint, ProviderPoolBuilder};
use semioscan::SemioscanConfigBuilder;
use tokio::net::{TcpListener, TcpStream};
async fn spawn_stalled_listener() -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr: SocketAddr = listener.local_addr().expect("local_addr");
let url = format!("http://{addr}");
let parked: Arc<tokio::sync::Mutex<Vec<TcpStream>>> = Arc::new(tokio::sync::Mutex::new(vec![]));
tokio::spawn({
let parked = Arc::clone(&parked);
async move {
loop {
let Ok((stream, _peer)) = listener.accept().await else {
break;
};
parked.lock().await.push(stream);
}
}
});
url
}
async fn drain_two_concurrent_pool_requests(
chain_rate_limit_delay: Option<Duration>,
transport_timeout: Duration,
) -> Duration {
let url = spawn_stalled_listener().await;
let mut builder = SemioscanConfigBuilder::with_defaults()
.chain_timeout(NamedChain::Mainnet, transport_timeout);
if let Some(d) = chain_rate_limit_delay {
builder = builder.chain_rate_limit(NamedChain::Mainnet, d);
}
let config = builder.build();
let pool = ProviderPoolBuilder::new()
.add_chain(NamedChain::Mainnet, &url)
.with_rpc_policy(&config)
.build()
.expect("pool built");
let provider = pool.get(NamedChain::Mainnet).expect("provider present");
let start = Instant::now();
let (a, b) = tokio::join!(provider.get_block_number(), provider.get_block_number());
let elapsed = start.elapsed();
assert!(a.is_err(), "first request should time out: {a:?}");
assert!(b.is_err(), "second request should time out: {b:?}");
elapsed
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pool_with_rpc_policy_honors_chain_rate_limit_delay() {
let delay = Duration::from_millis(400);
let transport_timeout = Duration::from_millis(150);
let baseline = drain_two_concurrent_pool_requests(None, transport_timeout).await;
let throttled = drain_two_concurrent_pool_requests(Some(delay), transport_timeout).await;
let minimum_gap = delay / 2;
let actual_gap = throttled.saturating_sub(baseline);
assert!(
actual_gap >= minimum_gap,
"pool ignored policy rate_limit_delay: baseline={baseline:?}, \
throttled={throttled:?}, gap={actual_gap:?} < required {minimum_gap:?} \
(rate_limit_delay={delay:?})"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn endpoint_min_delay_overrides_policy_rate_limit_delay() {
let url = spawn_stalled_listener().await;
let endpoint_delay = Duration::from_millis(400);
let policy_delay = Duration::from_millis(50);
let transport_timeout = Duration::from_millis(150);
let config = SemioscanConfigBuilder::with_defaults()
.chain_timeout(NamedChain::Mainnet, transport_timeout)
.chain_rate_limit(NamedChain::Mainnet, policy_delay)
.build();
let endpoint = ChainEndpoint::new(NamedChain::Mainnet, &url).with_min_delay(endpoint_delay);
let pool = ProviderPoolBuilder::new()
.add_endpoint(endpoint)
.with_rpc_policy(&config)
.build()
.expect("pool built");
let provider = pool.get(NamedChain::Mainnet).expect("provider present");
let start = Instant::now();
let (a, b) = tokio::join!(provider.get_block_number(), provider.get_block_number());
let elapsed = start.elapsed();
assert!(a.is_err(), "first request should time out: {a:?}");
assert!(b.is_err(), "second request should time out: {b:?}");
let minimum_total = transport_timeout + endpoint_delay / 2;
assert!(
elapsed >= minimum_total,
"endpoint min_delay did not override policy delay: elapsed={elapsed:?} \
< required {minimum_total:?} (endpoint_delay={endpoint_delay:?}, \
policy_delay={policy_delay:?})"
);
}