use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use fd_lock::RwLock;
use super::config::Limits;
use super::outcome::{AttemptOutcome, RunResult, Snapshot};
use super::progress::{ProgressHook, RateLimitWait};
use super::store::StateStore;
use super::wait::{WaitTracker, snapshot_from_state};
use crate::Clock;
use crate::Result;
const LOCK_RETRY_MS: u64 = 50;
pub(super) const MAX_SLEEP_SECS: u64 = 80;
#[derive(Clone)]
pub struct RateLimiter {
pub(super) store: StateStore,
pub(super) limits: Limits,
pub(super) clock: Arc<dyn Clock>,
pub(super) progress_hook: Option<ProgressHook>,
}
impl RateLimiter {
pub fn new(
state_dir: PathBuf,
proxy: Option<&str>,
limits: Limits,
clock: Arc<dyn Clock>,
) -> Self {
Self {
store: StateStore::new(state_dir, proxy),
limits,
clock,
progress_hook: None,
}
}
#[must_use]
pub fn with_progress_hook(mut self, hook: Option<ProgressHook>) -> Self {
self.progress_hook = hook;
self
}
pub async fn run<F, Fut, T>(&self, no_wait: bool, f: F) -> Result<RunResult<T>>
where
F: FnOnce(Snapshot) -> Fut,
Fut: Future<Output = (AttemptOutcome, T)>,
{
let mut f_slot = Some(f);
let mut cooldown = WaitTracker::new();
let mut spacing = WaitTracker::new();
loop {
let file = self.store.open_lock_file()?;
let mut lock = RwLock::new(file);
let Ok(_guard) = lock.try_write() else {
self.clock.sleep(Duration::from_millis(LOCK_RETRY_MS)).await;
continue;
};
let now = self.clock.now();
let mut state = self.store.read_state(now);
let consecutive = state.consecutive_blocks;
if let Some(until) = state.blocked_until
&& until > now
{
self.store.write_state(&state)?;
drop(_guard);
drop(lock);
let dur = until - now;
self.wait_or_abort(
RateLimitWait::Cooldown,
dur,
no_wait,
&mut cooldown,
consecutive,
)
.await?;
continue;
}
if state.blocked_until.is_some() {
state.blocked_until = None;
self.store.write_state(&state)?;
}
if state.next_allowed_at > now {
let dur = state.next_allowed_at - now;
self.store.write_state(&state)?;
drop(_guard);
drop(lock);
self.wait_or_abort(
RateLimitWait::Spacing,
dur,
no_wait,
&mut spacing,
consecutive,
)
.await?;
continue;
}
let advance = self.limits.spacing(state.slowdown_until, now);
state.next_allowed_at = now + advance;
self.store.write_state(&state)?;
let preflight = snapshot_from_state(&state);
let f_taken = f_slot
.take()
.expect("FnOnce taken exactly once on the proceed path");
let (outcome, value) = f_taken(preflight.clone()).await;
self.update_post_flight(outcome, self.clock.now())?;
let final_state = self.store.read_state(self.clock.now());
return Ok(RunResult {
value,
snapshot: snapshot_from_state(&final_state),
outcome,
});
}
}
}