use std::time::{Duration, Instant};
use umbral_auth::{
Throttle, login_throttle_check, login_throttle_clear, register_throttle_check,
};
#[test]
fn over_budget_attempt_is_denied() {
let t = Throttle::new(2, Duration::from_secs(60));
let now = Instant::now();
assert!(t.check_at("k", now), "1st attempt allowed");
assert!(t.check_at("k", now), "2nd attempt allowed");
assert!(!t.check_at("k", now), "3rd in-window attempt denied (max=2)");
}
#[test]
fn distinct_keys_have_independent_budgets() {
let t = Throttle::new(1, Duration::from_secs(60));
let now = Instant::now();
assert!(t.check_at("ip-a|alice", now));
assert!(!t.check_at("ip-a|alice", now), "alice exhausted");
assert!(
t.check_at("ip-b|bob", now),
"bob has his own budget; one user's lockout never bleeds onto another"
);
}
#[test]
fn window_elapsing_re_allows_a_key() {
let t = Throttle::new(1, Duration::from_secs(60));
let now = Instant::now();
assert!(t.check_at("k", now));
assert!(!t.check_at("k", now), "second within the window denied");
let later = now + Duration::from_secs(61);
assert!(
t.check_at("k", later),
"after the window elapses the stale hit is pruned and the key is allowed again"
);
}
#[test]
fn clear_resets_a_key() {
let t = Throttle::new(1, Duration::from_secs(60));
let now = Instant::now();
assert!(t.check_at("k", now));
assert!(!t.check_at("k", now));
t.clear_at("k");
assert!(t.check_at("k", now), "clear forgives the counter");
}
#[test]
fn login_helper_denies_after_default_budget_then_clear_forgives() {
let ip = "203.0.113.7";
let user = "throttle_login_victim";
for i in 0..5 {
assert!(
login_throttle_check(ip, user),
"attempt {i} within the budget should be allowed"
);
}
assert!(
!login_throttle_check(ip, user),
"the 6th rapid login attempt must be denied (defends credential stuffing)"
);
login_throttle_clear(ip, user);
assert!(
login_throttle_check(ip, user),
"after a successful login clears the counter, the next attempt is allowed again"
);
}
#[test]
fn login_helper_keys_per_ip_and_username() {
let ip = "198.51.100.42";
for _ in 0..5 {
assert!(login_throttle_check(ip, "throttle_keying_a"));
}
assert!(!login_throttle_check(ip, "throttle_keying_a"));
assert!(
login_throttle_check(ip, "throttle_keying_b"),
"a different username from the same IP keeps its own budget"
);
}
#[test]
fn register_helper_denies_after_default_budget() {
let ip = "192.0.2.99";
for i in 0..10 {
assert!(
register_throttle_check(ip),
"register attempt {i} within budget allowed"
);
}
assert!(
!register_throttle_check(ip),
"the 11th rapid register from one IP must be denied (defends mass signup)"
);
}