use std::{
collections::{HashMap, HashSet},
sync::{Arc, RwLock},
time::{Duration, Instant},
};
use prometheus::Counter;
use tokio::{runtime::Handle, sync::Mutex, task};
use tracing::{error, trace, warn};
use casper_types::{EraId, PublicKey};
use crate::types::{NodeId, ValidatorMatrix};
const STORED_BUFFER_SECS: Duration = Duration::from_secs(2);
#[derive(Debug)]
pub(super) struct Limiter {
data: Arc<LimiterData>,
validator_matrix: ValidatorMatrix,
}
impl Limiter {
pub(super) fn new(
resources_per_second: u32,
wait_time_sec: Counter,
validator_matrix: ValidatorMatrix,
) -> Self {
Limiter {
data: Arc::new(LimiterData::new(resources_per_second, wait_time_sec)),
validator_matrix,
}
}
pub(super) fn create_handle(
&self,
peer_id: NodeId,
consensus_key: Option<PublicKey>,
) -> LimiterHandle {
if let Some(public_key) = consensus_key.as_ref().cloned() {
match self.data.connected_validators.write() {
Ok(mut connected_validators) => {
let _ = connected_validators.insert(peer_id, public_key);
}
Err(_) => {
error!(
"could not update connected validator data set of limiter, lock poisoned"
);
}
}
}
LimiterHandle {
data: self.data.clone(),
validator_matrix: self.validator_matrix.clone(),
consumer_id: ConsumerId {
_peer_id: peer_id,
consensus_key,
},
}
}
pub(super) fn remove_connected_validator(&self, peer_id: &NodeId) {
match self.data.connected_validators.write() {
Ok(mut connected_validators) => {
let _ = connected_validators.remove(peer_id);
}
Err(_) => {
error!(
"could not remove connected validator from data set of limiter, lock poisoned"
);
}
}
}
pub(super) fn is_validator_in_era(&self, era: EraId, peer_id: &NodeId) -> bool {
let public_key = match self.data.connected_validators.read() {
Ok(connected_validators) => match connected_validators.get(peer_id) {
None => return false,
Some(public_key) => public_key.clone(),
},
Err(_) => {
error!("could not read from connected_validators of limiter, lock poisoned");
return false;
}
};
match self.validator_matrix.is_validator_in_era(era, &public_key) {
None => {
warn!(%era, "missing validator weights for given era");
false
}
Some(is_validator) => is_validator,
}
}
pub(super) fn debug_inspect_unspent_allowance(&self) -> Option<i64> {
Some(task::block_in_place(move || {
Handle::current().block_on(async move { self.data.resources.lock().await.available })
}))
}
pub(super) fn debug_inspect_validators(
&self,
current_era: &EraId,
) -> Option<(HashSet<PublicKey>, HashSet<PublicKey>)> {
Some((
self.validator_keys_for_era(current_era),
self.validator_keys_for_era(¤t_era.successor()),
))
}
fn validator_keys_for_era(&self, era: &EraId) -> HashSet<PublicKey> {
self.validator_matrix
.validator_weights(*era)
.map(|validator_weights| validator_weights.validator_public_keys().cloned().collect())
.unwrap_or_default()
}
}
#[derive(Debug)]
struct LimiterData {
resources_per_second: u32,
connected_validators: RwLock<HashMap<NodeId, PublicKey>>,
resources: Mutex<ResourceData>,
wait_time_sec: Counter,
}
#[derive(Debug)]
struct ResourceData {
available: i64,
last_refill: Instant,
}
impl LimiterData {
fn new(resources_per_second: u32, wait_time_sec: Counter) -> Self {
LimiterData {
resources_per_second,
connected_validators: Default::default(),
resources: Mutex::new(ResourceData {
available: 0,
last_refill: Instant::now(),
}),
wait_time_sec,
}
}
}
enum PeerClass {
Validator,
NonValidator,
}
#[derive(Debug)]
pub(super) struct LimiterHandle {
data: Arc<LimiterData>,
validator_matrix: ValidatorMatrix,
consumer_id: ConsumerId,
}
impl LimiterHandle {
pub(super) async fn request_allowance(&self, amount: u32) {
if self.validator_matrix.is_empty() {
trace!("empty set of validators, not limiting resources at all");
return;
}
let peer_class = if let Some(ref public_key) = self.consumer_id.consensus_key {
if self
.validator_matrix
.is_active_or_upcoming_validator(public_key)
{
PeerClass::Validator
} else {
PeerClass::NonValidator
}
} else {
PeerClass::NonValidator
};
match peer_class {
PeerClass::Validator => {
}
PeerClass::NonValidator => {
if self.data.resources_per_second == 0 {
return;
}
let max_stored_resource = ((self.data.resources_per_second as f64)
* STORED_BUFFER_SECS.as_secs_f64())
as u32;
{
let mut resources = self.data.resources.lock().await;
while resources.available < 0 {
let now = Instant::now();
let elapsed = now - resources.last_refill;
resources.last_refill = now;
resources.available += ((elapsed.as_nanos()
* self.data.resources_per_second as u128)
/ 1_000_000_000) as i64;
resources.available = resources.available.min(max_stored_resource as i64);
if resources.available < 0 {
let estimated_time_remaining = Duration::from_millis(
(-resources.available) as u64 * 1000
/ self.data.resources_per_second as u64,
);
tokio::time::sleep(estimated_time_remaining).await;
self.data
.wait_time_sec
.inc_by(estimated_time_remaining.as_secs_f64());
}
}
resources.available -= amount as i64;
}
}
}
}
}
#[derive(Debug)]
struct ConsumerId {
_peer_id: NodeId,
consensus_key: Option<PublicKey>,
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use casper_types::{ChainNameDigest, EraId, SecretKey};
use num_rational::Ratio;
use prometheus::Counter;
use tokio::time::Instant;
use super::{Limiter, NodeId, PublicKey};
use crate::{testing::init_logging, types::ValidatorMatrix};
const SHORT_TIME: Duration = Duration::from_millis(250);
fn new_wait_time_sec() -> Counter {
Counter::new("test_time_waiting", "wait time counter used in tests")
.expect("could not create new counter")
}
#[tokio::test]
async fn unlimited_limiter_is_unlimited() {
let mut rng = crate::new_rng();
let validator_matrix =
ValidatorMatrix::new_with_validator(Arc::new(SecretKey::random(&mut rng)));
let limiter = Limiter::new(0, new_wait_time_sec(), validator_matrix);
let handles = vec![
limiter.create_handle(NodeId::random(&mut rng), Some(PublicKey::random(&mut rng))),
limiter.create_handle(NodeId::random(&mut rng), None),
];
for handle in handles {
let start = Instant::now();
handle.request_allowance(0).await;
handle.request_allowance(u32::MAX).await;
handle.request_allowance(1).await;
assert!(start.elapsed() < SHORT_TIME);
}
}
#[tokio::test]
async fn active_validator_is_unlimited() {
let mut rng = crate::new_rng();
let secret_key = SecretKey::random(&mut rng);
let consensus_key = PublicKey::from(&secret_key);
let validator_matrix = ValidatorMatrix::new_with_validator(Arc::new(secret_key));
let limiter = Limiter::new(1_000, new_wait_time_sec(), validator_matrix);
let handle = limiter.create_handle(NodeId::random(&mut rng), Some(consensus_key));
let start = Instant::now();
handle.request_allowance(0).await;
handle.request_allowance(u32::MAX).await;
handle.request_allowance(1).await;
assert!(start.elapsed() < SHORT_TIME);
}
#[tokio::test]
async fn inactive_validator_limited() {
let rng = &mut crate::new_rng();
let validator_matrix =
ValidatorMatrix::new_with_validator(Arc::new(SecretKey::random(rng)));
let peers = [
(NodeId::random(rng), Some(PublicKey::random(rng))),
(NodeId::random(rng), None),
];
let limiter = Limiter::new(1_000, new_wait_time_sec(), validator_matrix);
for (peer, maybe_public_key) in peers {
let start = Instant::now();
let handle = limiter.create_handle(peer, maybe_public_key);
handle.request_allowance(1000).await;
handle.request_allowance(1000).await;
handle.request_allowance(1000).await;
handle.request_allowance(2000).await;
handle.request_allowance(4000).await;
handle.request_allowance(1).await;
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_secs(9),
"{}s",
elapsed.as_secs_f64()
);
assert!(
elapsed <= Duration::from_secs(10),
"{}s",
elapsed.as_secs_f64()
);
}
}
#[tokio::test]
async fn nonvalidators_parallel_limited() {
let mut rng = crate::new_rng();
let wait_metric = new_wait_time_sec();
let validator_matrix =
ValidatorMatrix::new_with_validator(Arc::new(SecretKey::random(&mut rng)));
let limiter = Limiter::new(1_000, wait_metric.clone(), validator_matrix);
let start = Instant::now();
let join_handles = (0..5)
.map(|_| {
limiter.create_handle(NodeId::random(&mut rng), Some(PublicKey::random(&mut rng)))
})
.map(|handle| {
tokio::spawn(async move {
handle.request_allowance(500).await;
handle.request_allowance(150).await;
handle.request_allowance(350).await;
handle.request_allowance(1).await;
})
});
for join_handle in join_handles {
join_handle.await.expect("could not join task");
}
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_secs(5));
assert!(elapsed <= Duration::from_secs(6));
assert!(
wait_metric.get() <= 6.0,
"wait metric is too large: {}",
wait_metric.get()
);
assert!(
wait_metric.get() >= 4.5,
"wait metric is too small: {}",
wait_metric.get()
);
}
#[tokio::test]
async fn inactive_validators_unlimited_when_no_validators_known() {
init_logging();
let mut rng = crate::new_rng();
let secret_key = SecretKey::random(&mut rng);
let consensus_key = PublicKey::from(&secret_key);
let wait_metric = new_wait_time_sec();
let limiter = Limiter::new(
1_000,
wait_metric.clone(),
ValidatorMatrix::new(
Ratio::new(1, 3),
ChainNameDigest::from_chain_name("casper-example"),
None,
EraId::from(0),
Arc::new(secret_key),
consensus_key.clone(),
2,
3,
),
);
let handles = vec![
limiter.create_handle(NodeId::random(&mut rng), Some(PublicKey::random(&mut rng))),
limiter.create_handle(NodeId::random(&mut rng), None),
];
for handle in handles {
let start = Instant::now();
handle.request_allowance(1000).await;
handle.request_allowance(1000).await;
handle.request_allowance(1000).await;
handle.request_allowance(2000).await;
handle.request_allowance(4000).await;
handle.request_allowance(1).await;
assert!(start.elapsed() < SHORT_TIME);
}
assert!(
wait_metric.get() < SHORT_TIME.as_secs_f64(),
"wait_metric is too large: {}",
wait_metric.get()
);
}
#[tokio::test]
async fn throttling_of_non_validators_does_not_affect_validators() {
init_logging();
let mut rng = crate::new_rng();
let secret_key = SecretKey::random(&mut rng);
let consensus_key = PublicKey::from(&secret_key);
let validator_matrix = ValidatorMatrix::new_with_validator(Arc::new(secret_key));
let limiter = Limiter::new(1_000, new_wait_time_sec(), validator_matrix);
let non_validator_handle = limiter.create_handle(NodeId::random(&mut rng), None);
let validator_handle = limiter.create_handle(NodeId::random(&mut rng), Some(consensus_key));
let start = Instant::now();
let background_nv_request = tokio::spawn(async move {
non_validator_handle.request_allowance(5000).await;
non_validator_handle.request_allowance(5000).await;
Instant::now()
});
tokio::time::sleep(Duration::from_secs(1)).await;
validator_handle.request_allowance(10000).await;
validator_handle.request_allowance(10000).await;
let v_finished = Instant::now();
let nv_finished = background_nv_request
.await
.expect("failed to join background nv task");
let nv_completed = nv_finished.duration_since(start);
assert!(
nv_completed >= Duration::from_millis(4500),
"non-validator did not delay sufficiently: {:?}",
nv_completed
);
let v_completed = v_finished.duration_since(start);
assert!(
v_completed <= Duration::from_millis(1500),
"validator did not finish quickly enough: {:?}",
v_completed
);
}
}