#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BackendLimits {
pub read_rps: Option<u64>,
pub write_rps: Option<u64>,
pub read_bps: Option<u64>,
pub write_bps: Option<u64>,
}
impl BackendLimits {
pub const UNLIMITED: BackendLimits = BackendLimits {
read_rps: None,
write_rps: None,
read_bps: None,
write_bps: None,
};
}
#[must_use]
pub fn for_scheme(scheme: &str) -> BackendLimits {
match scheme {
"s3" => BackendLimits {
read_rps: Some(5500),
write_rps: Some(3500),
read_bps: None,
write_bps: None,
},
"gcs" | "gs" => BackendLimits {
read_rps: Some(5000),
write_rps: Some(1000),
read_bps: None,
write_bps: None,
},
"b2" => BackendLimits {
read_rps: Some(20),
write_rps: Some(50),
read_bps: Some(25 * 1024 * 1024),
write_bps: Some(100 * 1024 * 1024),
},
_ => BackendLimits::UNLIMITED,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transfer::RateLimiter;
use std::time::Duration;
fn runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.expect("build tokio runtime")
}
#[test]
fn limits_for_scheme_file_is_unlimited() {
assert_eq!(for_scheme("file"), BackendLimits::UNLIMITED);
let l = for_scheme("file");
assert_eq!(l.read_rps, None);
assert_eq!(l.write_rps, None);
assert_eq!(l.read_bps, None);
assert_eq!(l.write_bps, None);
}
#[test]
fn limits_for_scheme_unknown_is_unlimited() {
assert_eq!(for_scheme("mock"), BackendLimits::UNLIMITED);
assert_eq!(for_scheme("azure"), BackendLimits::UNLIMITED);
assert_eq!(for_scheme(""), BackendLimits::UNLIMITED);
}
#[test]
fn limits_for_scheme_s3() {
let l = for_scheme("s3");
assert_eq!(l.read_rps, Some(5500));
assert_eq!(l.write_rps, Some(3500));
assert_eq!(l.read_bps, None);
assert_eq!(l.write_bps, None);
}
#[test]
fn limits_for_scheme_gcs() {
let expected = BackendLimits {
read_rps: Some(5000),
write_rps: Some(1000),
read_bps: None,
write_bps: None,
};
assert_eq!(for_scheme("gcs"), expected);
assert_eq!(for_scheme("gs"), expected);
}
#[test]
fn limits_for_scheme_b2() {
let l = for_scheme("b2");
assert_eq!(l.read_rps, Some(20));
assert_eq!(l.write_rps, Some(50));
assert_eq!(l.read_bps, Some(25 * 1024 * 1024));
assert_eq!(l.write_bps, Some(100 * 1024 * 1024));
}
#[test]
fn limits_request_limiter_unlimited_is_noop() {
let rt = runtime();
rt.block_on(async {
let limiter = RateLimiter::new(None);
let start = tokio::time::Instant::now();
for _ in 0..1000 {
limiter.acquire(1).await; }
assert!(
start.elapsed() < Duration::from_millis(200),
"unlimited request limiter must not pace requests"
);
});
}
#[test]
fn limits_request_limiter_paces_requests() {
let rt = runtime();
rt.block_on(async {
let rps = 5;
let limiter = RateLimiter::new(Some(rps));
let start = tokio::time::Instant::now();
for _ in 0..(rps * 2) {
limiter.acquire(1).await; }
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_millis(900),
"a request limiter fed {rps} req/s should pace 2x burst to ~1s, took {elapsed:?}"
);
});
}
}