use super::distiller::{ActivityRow, PgssRow, Snapshot, StatDatabaseRow, StatIoRow};
use super::queries::AllowedQuery;
use super::readonly_conn::ReadOnlyPgConn;
use anyhow::{Context, Result};
use std::collections::VecDeque;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy)]
pub struct Budget {
pub max_poll_ms: u64,
pub cpu_pct: f64,
}
impl Default for Budget {
fn default() -> Self {
Self {
max_poll_ms: 500,
cpu_pct: 0.1,
}
}
}
pub const ROLLING_WINDOW: usize = 16;
pub const RECOVERY_GOOD_POLLS: usize = 3;
pub const MAX_SLEEP: Duration = Duration::from_secs(60);
pub const MIN_SLEEP: Duration = Duration::from_millis(50);
#[derive(Debug, Clone)]
pub struct BackpressureState {
interval: Duration,
budget: Budget,
rolling_wall_ms: VecDeque<u64>,
rolling_self_ms: VecDeque<u64>,
rolling_interval_ms: VecDeque<u64>,
consecutive_good: usize,
current_sleep: Duration,
}
impl BackpressureState {
pub fn new(interval: Duration, budget: Budget) -> Self {
Self {
interval,
budget,
rolling_wall_ms: VecDeque::with_capacity(ROLLING_WINDOW),
rolling_self_ms: VecDeque::with_capacity(ROLLING_WINDOW),
rolling_interval_ms: VecDeque::with_capacity(ROLLING_WINDOW),
consecutive_good: 0,
current_sleep: interval,
}
}
pub fn record_and_plan(
&mut self,
wall: Duration,
self_time: Duration,
interval_since_last: Duration,
) -> PollReport {
push_bounded(&mut self.rolling_wall_ms, wall.as_millis() as u64);
push_bounded(&mut self.rolling_self_ms, self_time.as_millis() as u64);
push_bounded(
&mut self.rolling_interval_ms,
interval_since_last.as_millis() as u64,
);
let median_wall = median(&self.rolling_wall_ms);
let cpu_pct = rolling_cpu_ratio(&self.rolling_self_ms, &self.rolling_interval_ms);
let over_budget =
median_wall > self.budget.max_poll_ms || cpu_pct > self.budget.cpu_pct;
if over_budget {
self.consecutive_good = 0;
self.current_sleep = (self.current_sleep * 2).min(MAX_SLEEP);
} else {
self.consecutive_good += 1;
if self.consecutive_good >= RECOVERY_GOOD_POLLS {
let halved = self.current_sleep / 2;
self.current_sleep = halved.max(self.interval).max(MIN_SLEEP);
self.consecutive_good = 0;
}
}
let throttle_factor = self.current_sleep.as_secs_f64() / self.interval.as_secs_f64();
PollReport {
t_wall_start: unix_epoch_seconds(),
snapshot_duration_ms: wall.as_millis() as u64,
cpu_pct_rolling: cpu_pct,
throttle_factor,
}
}
pub fn current_sleep(&self) -> Duration {
self.current_sleep
}
pub fn nominal_interval(&self) -> Duration {
self.interval
}
}
#[derive(Debug, Clone)]
pub struct PollReport {
pub t_wall_start: f64,
pub snapshot_duration_ms: u64,
pub cpu_pct_rolling: f64,
pub throttle_factor: f64,
}
pub struct Scraper {
conn: ReadOnlyPgConn,
interval: Duration,
budget: Budget,
rolling_wall_ms: VecDeque<u64>,
rolling_self_ms: VecDeque<u64>,
rolling_interval_ms: VecDeque<u64>,
consecutive_good: usize,
current_sleep: Duration,
}
impl Scraper {
pub fn new(conn: ReadOnlyPgConn, interval: Duration, budget: Budget) -> Self {
Self {
conn,
interval,
budget,
rolling_wall_ms: VecDeque::with_capacity(ROLLING_WINDOW),
rolling_self_ms: VecDeque::with_capacity(ROLLING_WINDOW),
rolling_interval_ms: VecDeque::with_capacity(ROLLING_WINDOW),
consecutive_good: 0,
current_sleep: interval,
}
}
pub async fn next_snapshot(&mut self) -> Result<(Snapshot, Duration)> {
let start = Instant::now();
let t_abs = unix_epoch_seconds();
let mut snap = Snapshot::default();
snap.t = t_abs;
for q in AllowedQuery::ALL.iter() {
let rows = match self.conn.query_allowed(*q).await {
Ok(r) => r,
Err(e) => {
if matches!(q, AllowedQuery::PgStatIoSnapshot) {
eprintln!(
"warning: {:?} query failed (likely PG <16); falling back to pg_stat_database: {}",
q, e
);
Vec::new()
} else {
return Err(e).with_context(|| format!("poll failed on {:?}", q));
}
}
};
decode_into_snapshot(*q, rows, &mut snap)?;
}
let wall = start.elapsed();
Ok((snap, wall))
}
pub fn record_and_plan(
&mut self,
wall: Duration,
self_time: Duration,
interval_since_last: Duration,
) -> PollReport {
push_bounded(&mut self.rolling_wall_ms, wall.as_millis() as u64);
push_bounded(&mut self.rolling_self_ms, self_time.as_millis() as u64);
push_bounded(
&mut self.rolling_interval_ms,
interval_since_last.as_millis() as u64,
);
let median_wall = median(&self.rolling_wall_ms);
let cpu_pct = rolling_cpu_ratio(&self.rolling_self_ms, &self.rolling_interval_ms);
let over_budget = median_wall > self.budget.max_poll_ms
|| cpu_pct > self.budget.cpu_pct;
if over_budget {
self.consecutive_good = 0;
self.current_sleep = (self.current_sleep * 2).min(MAX_SLEEP);
} else {
self.consecutive_good += 1;
if self.consecutive_good >= RECOVERY_GOOD_POLLS {
let halved = self.current_sleep / 2;
self.current_sleep = halved.max(self.interval).max(MIN_SLEEP);
self.consecutive_good = 0;
}
}
let throttle_factor = self.current_sleep.as_secs_f64() / self.interval.as_secs_f64();
PollReport {
t_wall_start: unix_epoch_seconds(),
snapshot_duration_ms: wall.as_millis() as u64,
cpu_pct_rolling: cpu_pct,
throttle_factor,
}
}
pub fn next_sleep(&self) -> Duration {
self.current_sleep
}
pub fn nominal_interval(&self) -> Duration {
self.interval
}
}
fn push_bounded<T>(dq: &mut VecDeque<T>, v: T) {
if dq.len() == ROLLING_WINDOW {
dq.pop_front();
}
dq.push_back(v);
}
fn median(dq: &VecDeque<u64>) -> u64 {
if dq.is_empty() {
return 0;
}
let mut v: Vec<u64> = dq.iter().copied().collect();
v.sort_unstable();
v[v.len() / 2]
}
fn rolling_cpu_ratio(self_ms: &VecDeque<u64>, interval_ms: &VecDeque<u64>) -> f64 {
let s: u64 = self_ms.iter().sum();
let i: u64 = interval_ms.iter().sum();
if i == 0 {
return 0.0;
}
s as f64 / i as f64
}
fn unix_epoch_seconds() -> f64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0)
}
fn decode_into_snapshot(
q: AllowedQuery,
rows: Vec<tokio_postgres::Row>,
snap: &mut Snapshot,
) -> Result<()> {
match q {
AllowedQuery::PgStatStatementsSnapshot => {
for r in rows {
let _t: f64 = r.try_get::<_, f64>(0).unwrap_or(0.0);
let qid: String = r.try_get::<_, String>(1).unwrap_or_default();
let calls: i64 = r.try_get::<_, i64>(2).unwrap_or(0);
let total: f64 = r.try_get::<_, f64>(3).unwrap_or(0.0);
snap.pgss.push(PgssRow {
query_id: qid,
calls: calls.max(0) as u64,
total_exec_time_ms: total,
});
}
}
AllowedQuery::PgStatActivitySnapshot => {
for r in rows {
let _t: f64 = r.try_get::<_, f64>(0).unwrap_or(0.0);
let wet: String = r.try_get::<_, String>(1).unwrap_or_default();
let we: String = r.try_get::<_, String>(2).unwrap_or_default();
let state: String = r
.try_get::<_, Option<String>>(3)
.unwrap_or_default()
.unwrap_or_default();
snap.activity.push(ActivityRow {
wait_event_type: wet,
wait_event: we,
state,
});
}
}
AllowedQuery::PgStatIoSnapshot => {
for r in rows {
let _t: f64 = r.try_get::<_, f64>(0).unwrap_or(0.0);
let backend: String = r.try_get::<_, String>(1).unwrap_or_default();
let object: String = r.try_get::<_, String>(2).unwrap_or_default();
let context: String = r.try_get::<_, String>(3).unwrap_or_default();
let reads: i64 = r.try_get::<_, i64>(4).unwrap_or(0);
let hits: i64 = r.try_get::<_, i64>(5).unwrap_or(0);
let rt: f64 = r.try_get::<_, f64>(6).unwrap_or(0.0);
snap.stat_io.push(StatIoRow {
backend_type: backend,
object,
context,
reads: reads.max(0) as u64,
hits: hits.max(0) as u64,
read_time_ms: rt,
});
}
}
AllowedQuery::PgStatDatabaseSnapshot => {
for r in rows {
let _t: f64 = r.try_get::<_, f64>(0).unwrap_or(0.0);
let datname: String = r.try_get::<_, String>(1).unwrap_or_default();
let hits: i64 = r.try_get::<_, i64>(2).unwrap_or(0);
let reads: i64 = r.try_get::<_, i64>(3).unwrap_or(0);
snap.stat_database.push(StatDatabaseRow {
datname,
blks_hit: hits.max(0) as u64,
blks_read: reads.max(0) as u64,
});
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn bp_state(interval: Duration, budget: Budget) -> BackpressureState {
BackpressureState {
interval,
budget,
rolling_wall_ms: VecDeque::new(),
rolling_self_ms: VecDeque::new(),
rolling_interval_ms: VecDeque::new(),
consecutive_good: 0,
current_sleep: interval,
}
}
#[test]
fn doubles_under_sustained_slow_response() {
let mut st = bp_state(
Duration::from_millis(100),
Budget {
max_poll_ms: 50,
cpu_pct: 1.0, },
);
for _ in 0..ROLLING_WINDOW {
st.record_and_plan(
Duration::from_millis(200),
Duration::from_millis(1),
Duration::from_millis(100),
);
}
assert!(
st.current_sleep > Duration::from_millis(100),
"next-sleep should have doubled at least once under sustained slow response, got {:?}",
st.current_sleep
);
}
#[test]
fn recovers_once_rolling_median_drops() {
let mut st = bp_state(
Duration::from_millis(100),
Budget {
max_poll_ms: 50,
cpu_pct: 1.0,
},
);
for _ in 0..ROLLING_WINDOW {
st.record_and_plan(
Duration::from_millis(200),
Duration::from_millis(1),
Duration::from_millis(100),
);
}
let saturated = st.current_sleep;
assert!(saturated > Duration::from_millis(100));
for _ in 0..(ROLLING_WINDOW + RECOVERY_GOOD_POLLS) {
st.record_and_plan(
Duration::from_millis(10),
Duration::from_millis(1),
Duration::from_millis(100),
);
}
assert!(
st.current_sleep < saturated,
"sustained good polls should halve the sleep; before={:?} after={:?}",
saturated,
st.current_sleep
);
}
struct BackpressureState {
interval: Duration,
budget: Budget,
rolling_wall_ms: VecDeque<u64>,
rolling_self_ms: VecDeque<u64>,
rolling_interval_ms: VecDeque<u64>,
consecutive_good: usize,
current_sleep: Duration,
}
impl BackpressureState {
fn record_and_plan(
&mut self,
wall: Duration,
self_time: Duration,
interval_since_last: Duration,
) {
push_bounded(&mut self.rolling_wall_ms, wall.as_millis() as u64);
push_bounded(&mut self.rolling_self_ms, self_time.as_millis() as u64);
push_bounded(
&mut self.rolling_interval_ms,
interval_since_last.as_millis() as u64,
);
let median_wall = median(&self.rolling_wall_ms);
let cpu_pct = rolling_cpu_ratio(&self.rolling_self_ms, &self.rolling_interval_ms);
let over_budget = median_wall > self.budget.max_poll_ms
|| cpu_pct > self.budget.cpu_pct;
if over_budget {
self.consecutive_good = 0;
self.current_sleep = (self.current_sleep * 2).min(MAX_SLEEP);
} else {
self.consecutive_good += 1;
if self.consecutive_good >= RECOVERY_GOOD_POLLS {
let halved = self.current_sleep / 2;
self.current_sleep = halved.max(self.interval).max(MIN_SLEEP);
self.consecutive_good = 0;
}
}
}
}
}