use chrono::{DateTime, Duration as ChronoDuration, Timelike, Utc};
use serde::Serialize;
use serde_json::{Value, json};
use sqlx::postgres::PgPool;
use sqlx::{Postgres, Row, Transaction};
pub const CLIENT_PRESSURE_FORMULA_VERSION: &str = "v1";
pub const CLIENT_PRESSURE_DEFAULT_RETENTION_DAYS: i64 = 30;
pub const CLIENT_PRESSURE_MAX_WINDOWS_PER_TICK: i64 = 2;
pub const CLIENT_PRESSURE_LOGGING_SKIP_SATURATION: f64 = 0.60;
pub const CLIENT_PRESSURE_CLIENT_SKIP_SATURATION: f64 = 0.80;
pub const CLIENT_PRESSURE_PLANNER_MAX_SATURATION: f64 = 0.50;
pub const CLIENT_PRESSURE_STALE_LOAD_MULTIPLIER: i64 = 2;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ClientPressureRunStatus {
Completed,
SkippedLoadShed,
Failed,
}
impl ClientPressureRunStatus {
pub fn as_str(self) -> &'static str {
match self {
Self::Completed => "completed",
Self::SkippedLoadShed => "skipped_load_shed",
Self::Failed => "failed",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ClientPressureBand {
Green,
Yellow,
Red,
}
impl ClientPressureBand {
pub fn as_str(self) -> &'static str {
match self {
Self::Green => "green",
Self::Yellow => "yellow",
Self::Red => "red",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ClientPressurePlacementHint {
Healthy,
Watch,
SplitHotTable,
MoveClient,
}
impl ClientPressurePlacementHint {
pub fn as_str(self) -> &'static str {
match self {
Self::Healthy => "healthy",
Self::Watch => "watch",
Self::SplitHotTable => "split_hot_table",
Self::MoveClient => "move_client",
}
}
}
#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
pub struct ClientPressureSnapshotRunRecord {
pub id: i64,
pub recorded_at: DateTime<Utc>,
pub window_start: DateTime<Utc>,
pub window_end: DateTime<Utc>,
pub status: String,
pub skip_reason: Option<String>,
pub formula_version: String,
pub logging_saturation_ratio: Option<f64>,
pub pool_timeout_like_failure: bool,
pub collection_error: Option<String>,
pub meta: Value,
}
#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
pub struct ClientPressureSnapshotRecord {
pub id: i64,
pub run_id: i64,
pub recorded_at: DateTime<Utc>,
pub window_start: DateTime<Utc>,
pub window_end: DateTime<Utc>,
pub client_name: String,
pub request_count: i64,
pub failed_requests: i64,
pub cache_misses: i64,
pub total_work_ms: i64,
pub p50_duration_ms: Option<f64>,
pub p95_duration_ms: Option<f64>,
pub error_ratio: Option<f64>,
pub cache_miss_ratio: Option<f64>,
pub tail_ratio: Option<f64>,
pub burst_ratio: Option<f64>,
pub saturation_ratio: Option<f64>,
pub dominant_table_share: Option<f64>,
pub request_shock: Option<f64>,
pub error_shock: Option<f64>,
pub pressure_score: Option<f64>,
pub volatility_score: Option<f64>,
pub planner_score: Option<f64>,
pub pressure_band: String,
pub placement_hint: String,
pub observed_only: bool,
pub planner_enrichment_attempted: bool,
pub planner_enrichment_applied: bool,
pub planner_enrichment_skipped_reason: Option<String>,
pub meta: Value,
}
#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
pub struct ClientTablePressureSnapshotRecord {
pub id: i64,
pub run_id: i64,
pub client_snapshot_id: i64,
pub recorded_at: DateTime<Utc>,
pub window_start: DateTime<Utc>,
pub window_end: DateTime<Utc>,
pub client_name: String,
pub table_name: String,
pub request_count: i64,
pub failed_requests: i64,
pub cache_misses: i64,
pub total_work_ms: i64,
pub p50_duration_ms: Option<f64>,
pub p95_duration_ms: Option<f64>,
pub share_of_client_work: Option<f64>,
pub error_ratio: Option<f64>,
pub cache_miss_ratio: Option<f64>,
pub tail_ratio: Option<f64>,
pub burst_ratio: Option<f64>,
pub request_shock: Option<f64>,
pub error_shock: Option<f64>,
pub pressure_score: Option<f64>,
pub volatility_score: Option<f64>,
pub pressure_band: String,
pub observed_only: bool,
pub meta: Value,
}
#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
pub struct ClientPressureBackfillRequestRecord {
pub id: i64,
pub requested_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub claimed_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub hours_back: i32,
pub status: String,
pub error_message: Option<String>,
pub formula_version: String,
pub meta: Value,
}
#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
pub struct ClientLoadSnapshot {
pub recorded_at: DateTime<Utc>,
pub client_name: String,
pub pool_size: i64,
pub idle_connections: i64,
pub active_connections: i64,
pub max_connections: i64,
pub saturation_ratio: Option<f64>,
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct ClientPressureWindowRow {
pub client_name: String,
pub request_count: i64,
pub failed_requests: i64,
pub cache_misses: i64,
pub total_work_ms: i64,
pub p50_duration_ms: Option<f64>,
pub p95_duration_ms: Option<f64>,
pub trailing_24h_median_hourly_requests: Option<f64>,
pub dominant_table_work_ms: i64,
pub previous_request_count: Option<i64>,
pub previous_failed_requests: Option<i64>,
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct ClientTablePressureWindowRow {
pub client_name: String,
pub table_name: String,
pub request_count: i64,
pub failed_requests: i64,
pub cache_misses: i64,
pub total_work_ms: i64,
pub p50_duration_ms: Option<f64>,
pub p95_duration_ms: Option<f64>,
pub trailing_24h_median_hourly_requests: Option<f64>,
pub previous_request_count: Option<i64>,
pub previous_failed_requests: Option<i64>,
}
#[derive(Debug, Clone)]
pub struct ClientPressureMetricInputs {
pub request_count: i64,
pub failed_requests: i64,
pub cache_misses: i64,
pub p50_duration_ms: Option<f64>,
pub p95_duration_ms: Option<f64>,
pub trailing_24h_median_hourly_requests: Option<f64>,
pub saturation_ratio: Option<f64>,
pub dominant_table_share: Option<f64>,
pub previous_request_count: Option<i64>,
pub previous_failed_requests: Option<i64>,
pub planner_score: Option<f64>,
}
#[derive(Debug, Clone)]
pub struct ClientTablePressureMetricInputs {
pub request_count: i64,
pub failed_requests: i64,
pub cache_misses: i64,
pub p50_duration_ms: Option<f64>,
pub p95_duration_ms: Option<f64>,
pub trailing_24h_median_hourly_requests: Option<f64>,
pub share_of_client_work: Option<f64>,
pub previous_request_count: Option<i64>,
pub previous_failed_requests: Option<i64>,
}
#[derive(Debug, Clone)]
pub struct ClientPressureComputation {
pub error_ratio: f64,
pub cache_miss_ratio: f64,
pub tail_ratio: f64,
pub burst_ratio: f64,
pub request_shock: f64,
pub error_shock: f64,
pub pressure_score: f64,
pub volatility_score: f64,
pub pressure_band: ClientPressureBand,
pub placement_hint: ClientPressurePlacementHint,
pub observed_only: bool,
}
#[derive(Debug, Clone)]
pub struct ClientTablePressureComputation {
pub error_ratio: f64,
pub cache_miss_ratio: f64,
pub tail_ratio: f64,
pub burst_ratio: f64,
pub request_shock: f64,
pub error_shock: f64,
pub pressure_score: f64,
pub volatility_score: f64,
pub pressure_band: ClientPressureBand,
}
#[derive(Debug, Clone)]
pub struct ClientPressureSnapshotInput {
pub client_name: String,
pub request_count: i64,
pub failed_requests: i64,
pub cache_misses: i64,
pub total_work_ms: i64,
pub p50_duration_ms: Option<f64>,
pub p95_duration_ms: Option<f64>,
pub error_ratio: Option<f64>,
pub cache_miss_ratio: Option<f64>,
pub tail_ratio: Option<f64>,
pub burst_ratio: Option<f64>,
pub saturation_ratio: Option<f64>,
pub dominant_table_share: Option<f64>,
pub request_shock: Option<f64>,
pub error_shock: Option<f64>,
pub pressure_score: Option<f64>,
pub volatility_score: Option<f64>,
pub planner_score: Option<f64>,
pub pressure_band: ClientPressureBand,
pub placement_hint: ClientPressurePlacementHint,
pub observed_only: bool,
pub planner_enrichment_attempted: bool,
pub planner_enrichment_applied: bool,
pub planner_enrichment_skipped_reason: Option<String>,
pub meta: Value,
pub table_snapshots: Vec<ClientTablePressureSnapshotInput>,
}
#[derive(Debug, Clone)]
pub struct ClientTablePressureSnapshotInput {
pub client_name: String,
pub table_name: String,
pub request_count: i64,
pub failed_requests: i64,
pub cache_misses: i64,
pub total_work_ms: i64,
pub p50_duration_ms: Option<f64>,
pub p95_duration_ms: Option<f64>,
pub share_of_client_work: Option<f64>,
pub error_ratio: Option<f64>,
pub cache_miss_ratio: Option<f64>,
pub tail_ratio: Option<f64>,
pub burst_ratio: Option<f64>,
pub request_shock: Option<f64>,
pub error_shock: Option<f64>,
pub pressure_score: Option<f64>,
pub volatility_score: Option<f64>,
pub pressure_band: ClientPressureBand,
pub observed_only: bool,
pub meta: Value,
}
pub fn floor_to_hour(ts: DateTime<Utc>) -> DateTime<Utc> {
ts.with_minute(0)
.and_then(|value| value.with_second(0))
.and_then(|value| value.with_nanosecond(0))
.unwrap_or(ts)
}
pub fn latest_closed_hour_window(now: DateTime<Utc>) -> (DateTime<Utc>, DateTime<Utc>) {
let window_end = floor_to_hour(now);
let window_start = window_end - ChronoDuration::hours(1);
(window_start, window_end)
}
pub fn is_load_snapshot_stale(
snapshot: &ClientLoadSnapshot,
interval_secs: i64,
multiplier: i64,
now: DateTime<Utc>,
) -> bool {
let max_age_secs = interval_secs.saturating_mul(multiplier.max(1));
now.signed_duration_since(snapshot.recorded_at)
.num_seconds()
> max_age_secs
}
pub fn clamp_unit(value: f64) -> f64 {
if !value.is_finite() {
return 0.0;
}
value.clamp(0.0, 1.0)
}
fn ratio(num: i64, denom: i64) -> f64 {
num as f64 / denom.max(1) as f64
}
fn tail_ratio(p95_duration_ms: Option<f64>, p50_duration_ms: Option<f64>) -> f64 {
let p95 = p95_duration_ms.unwrap_or(0.0);
let p50 = p50_duration_ms.unwrap_or(0.0).max(1.0);
if p95 <= 0.0 { 0.0 } else { p95 / p50 }
}
fn shock_ratio(current: f64, previous: f64, floor: f64) -> f64 {
((current - previous).abs()) / previous.max(floor)
}
pub fn compute_pressure_band(score: f64) -> ClientPressureBand {
if score >= 70.0 {
ClientPressureBand::Red
} else if score >= 35.0 {
ClientPressureBand::Yellow
} else {
ClientPressureBand::Green
}
}
pub fn compute_placement_hint(
score: f64,
dominant_table_share: Option<f64>,
) -> ClientPressurePlacementHint {
if score >= 70.0 {
if dominant_table_share.unwrap_or(0.0) >= 0.50 {
ClientPressurePlacementHint::SplitHotTable
} else {
ClientPressurePlacementHint::MoveClient
}
} else if score >= 35.0 {
ClientPressurePlacementHint::Watch
} else {
ClientPressurePlacementHint::Healthy
}
}
pub fn planner_score_from_total_cost(total_cost: f64) -> f64 {
clamp_unit((total_cost.max(0.0) + 1.0).log10() / 5.0) * 100.0
}
pub fn compute_client_pressure(inputs: &ClientPressureMetricInputs) -> ClientPressureComputation {
let error_ratio = ratio(inputs.failed_requests, inputs.request_count);
let cache_miss_ratio = ratio(inputs.cache_misses, inputs.request_count);
let tail = tail_ratio(inputs.p95_duration_ms, inputs.p50_duration_ms);
let burst = inputs.request_count as f64
/ inputs
.trailing_24h_median_hourly_requests
.unwrap_or(1.0)
.max(1.0);
let previous_request_count = inputs.previous_request_count.unwrap_or(0);
let previous_error_ratio = ratio(
inputs.previous_failed_requests.unwrap_or(0),
inputs.previous_request_count.unwrap_or(0),
);
let request_shock = shock_ratio(
inputs.request_count as f64,
previous_request_count.max(1) as f64,
1.0,
);
let error_shock = shock_ratio(error_ratio, previous_error_ratio, 0.01);
let latency_component = clamp_unit(inputs.p95_duration_ms.unwrap_or(0.0) / 2000.0);
let error_component = clamp_unit(error_ratio / 0.05);
let cache_component = clamp_unit(cache_miss_ratio / 0.50);
let burst_component = clamp_unit(burst / 4.0);
let saturation_component = clamp_unit(inputs.saturation_ratio.unwrap_or(0.0) / 0.85);
let concentration_component = clamp_unit(inputs.dominant_table_share.unwrap_or(0.0) / 0.60);
let base_score = 100.0
* ((0.30 * latency_component)
+ (0.20 * error_component)
+ (0.10 * cache_component)
+ (0.15 * burst_component)
+ (0.10 * saturation_component)
+ (0.15 * concentration_component));
let volatility_score = 100.0
* ((0.45 * clamp_unit(request_shock))
+ (0.35 * clamp_unit(tail / 4.0))
+ (0.20 * clamp_unit(error_shock)));
let observed_only = inputs.planner_score.is_none();
let pressure_score = inputs
.planner_score
.map(|planner| (0.85 * base_score) + (0.15 * planner))
.unwrap_or(base_score);
let band = compute_pressure_band(pressure_score);
let placement_hint = compute_placement_hint(pressure_score, inputs.dominant_table_share);
ClientPressureComputation {
error_ratio,
cache_miss_ratio,
tail_ratio: tail,
burst_ratio: burst,
request_shock,
error_shock,
pressure_score,
volatility_score,
pressure_band: band,
placement_hint,
observed_only,
}
}
pub fn compute_table_pressure(
inputs: &ClientTablePressureMetricInputs,
) -> ClientTablePressureComputation {
let error_ratio = ratio(inputs.failed_requests, inputs.request_count);
let cache_miss_ratio = ratio(inputs.cache_misses, inputs.request_count);
let tail = tail_ratio(inputs.p95_duration_ms, inputs.p50_duration_ms);
let burst = inputs.request_count as f64
/ inputs
.trailing_24h_median_hourly_requests
.unwrap_or(1.0)
.max(1.0);
let previous_request_count = inputs.previous_request_count.unwrap_or(0);
let previous_error_ratio = ratio(
inputs.previous_failed_requests.unwrap_or(0),
inputs.previous_request_count.unwrap_or(0),
);
let request_shock = shock_ratio(
inputs.request_count as f64,
previous_request_count.max(1) as f64,
1.0,
);
let error_shock = shock_ratio(error_ratio, previous_error_ratio, 0.01);
let share_component = clamp_unit(inputs.share_of_client_work.unwrap_or(0.0));
let latency_component = clamp_unit(inputs.p95_duration_ms.unwrap_or(0.0) / 2000.0);
let error_component = clamp_unit(error_ratio / 0.05);
let cache_component = clamp_unit(cache_miss_ratio / 0.50);
let burst_component = clamp_unit(burst / 4.0);
let pressure_score = 100.0
* ((0.35 * share_component)
+ (0.25 * latency_component)
+ (0.20 * error_component)
+ (0.10 * cache_component)
+ (0.10 * burst_component));
let volatility_score = 100.0
* ((0.45 * clamp_unit(request_shock))
+ (0.35 * clamp_unit(tail / 4.0))
+ (0.20 * clamp_unit(error_shock)));
let band = compute_pressure_band(pressure_score);
ClientTablePressureComputation {
error_ratio,
cache_miss_ratio,
tail_ratio: tail,
burst_ratio: burst,
request_shock,
error_shock,
pressure_score,
volatility_score,
pressure_band: band,
}
}
pub fn planner_enrichment_allowed(
logging_saturation_ratio: Option<f64>,
client_saturation_ratio: Option<f64>,
) -> bool {
logging_saturation_ratio.unwrap_or(1.0) < CLIENT_PRESSURE_PLANNER_MAX_SATURATION
&& client_saturation_ratio.unwrap_or(1.0) < CLIENT_PRESSURE_PLANNER_MAX_SATURATION
}
pub async fn insert_client_pressure_run(
logging_pool: &PgPool,
window_start: DateTime<Utc>,
window_end: DateTime<Utc>,
status: ClientPressureRunStatus,
skip_reason: Option<&str>,
logging_saturation_ratio: Option<f64>,
pool_timeout_like_failure: bool,
collection_error: Option<&str>,
meta: Value,
) -> Result<i64, sqlx::Error> {
sqlx::query_scalar(
r#"
INSERT INTO client_pressure_snapshot_runs (
window_start,
window_end,
status,
skip_reason,
formula_version,
logging_saturation_ratio,
pool_timeout_like_failure,
collection_error,
meta
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING id
"#,
)
.bind(window_start)
.bind(window_end)
.bind(status.as_str())
.bind(skip_reason)
.bind(CLIENT_PRESSURE_FORMULA_VERSION)
.bind(logging_saturation_ratio)
.bind(pool_timeout_like_failure)
.bind(collection_error)
.bind(meta)
.fetch_one(logging_pool)
.await
}
pub async fn insert_client_pressure_completed_run(
logging_pool: &PgPool,
window_start: DateTime<Utc>,
window_end: DateTime<Utc>,
logging_saturation_ratio: Option<f64>,
meta: Value,
snapshots: &[ClientPressureSnapshotInput],
) -> Result<i64, sqlx::Error> {
let mut tx: Transaction<'_, Postgres> = logging_pool.begin().await?;
let run_id: i64 = sqlx::query_scalar(
r#"
INSERT INTO client_pressure_snapshot_runs (
window_start,
window_end,
status,
skip_reason,
formula_version,
logging_saturation_ratio,
pool_timeout_like_failure,
collection_error,
meta
)
VALUES ($1, $2, $3, NULL, $4, $5, FALSE, NULL, $6)
RETURNING id
"#,
)
.bind(window_start)
.bind(window_end)
.bind(ClientPressureRunStatus::Completed.as_str())
.bind(CLIENT_PRESSURE_FORMULA_VERSION)
.bind(logging_saturation_ratio)
.bind(meta)
.fetch_one(&mut *tx)
.await?;
for snapshot in snapshots {
let client_snapshot_id = sqlx::query_scalar::<_, i64>(
r#"
INSERT INTO client_pressure_snapshots (
run_id,
window_start,
window_end,
client_name,
request_count,
failed_requests,
cache_misses,
total_work_ms,
p50_duration_ms,
p95_duration_ms,
error_ratio,
cache_miss_ratio,
tail_ratio,
burst_ratio,
saturation_ratio,
dominant_table_share,
request_shock,
error_shock,
pressure_score,
volatility_score,
planner_score,
pressure_band,
placement_hint,
observed_only,
planner_enrichment_attempted,
planner_enrichment_applied,
planner_enrichment_skipped_reason,
meta
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8,
$9, $10, $11, $12, $13, $14, $15, $16,
$17, $18, $19, $20, $21, $22, $23, $24,
$25, $26, $27, $28
)
RETURNING id
"#,
)
.bind(run_id)
.bind(window_start)
.bind(window_end)
.bind(&snapshot.client_name)
.bind(snapshot.request_count)
.bind(snapshot.failed_requests)
.bind(snapshot.cache_misses)
.bind(snapshot.total_work_ms)
.bind(snapshot.p50_duration_ms)
.bind(snapshot.p95_duration_ms)
.bind(snapshot.error_ratio)
.bind(snapshot.cache_miss_ratio)
.bind(snapshot.tail_ratio)
.bind(snapshot.burst_ratio)
.bind(snapshot.saturation_ratio)
.bind(snapshot.dominant_table_share)
.bind(snapshot.request_shock)
.bind(snapshot.error_shock)
.bind(snapshot.pressure_score)
.bind(snapshot.volatility_score)
.bind(snapshot.planner_score)
.bind(snapshot.pressure_band.as_str())
.bind(snapshot.placement_hint.as_str())
.bind(snapshot.observed_only)
.bind(snapshot.planner_enrichment_attempted)
.bind(snapshot.planner_enrichment_applied)
.bind(snapshot.planner_enrichment_skipped_reason.as_deref())
.bind(&snapshot.meta)
.fetch_one(&mut *tx)
.await?;
for table in &snapshot.table_snapshots {
sqlx::query(
r#"
INSERT INTO client_table_pressure_snapshots (
run_id,
client_snapshot_id,
window_start,
window_end,
client_name,
table_name,
request_count,
failed_requests,
cache_misses,
total_work_ms,
p50_duration_ms,
p95_duration_ms,
share_of_client_work,
error_ratio,
cache_miss_ratio,
tail_ratio,
burst_ratio,
request_shock,
error_shock,
pressure_score,
volatility_score,
pressure_band,
observed_only,
meta
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8,
$9, $10, $11, $12, $13, $14, $15, $16,
$17, $18, $19, $20, $21, $22, $23, $24
)
"#,
)
.bind(run_id)
.bind(client_snapshot_id)
.bind(window_start)
.bind(window_end)
.bind(&table.client_name)
.bind(&table.table_name)
.bind(table.request_count)
.bind(table.failed_requests)
.bind(table.cache_misses)
.bind(table.total_work_ms)
.bind(table.p50_duration_ms)
.bind(table.p95_duration_ms)
.bind(table.share_of_client_work)
.bind(table.error_ratio)
.bind(table.cache_miss_ratio)
.bind(table.tail_ratio)
.bind(table.burst_ratio)
.bind(table.request_shock)
.bind(table.error_shock)
.bind(table.pressure_score)
.bind(table.volatility_score)
.bind(table.pressure_band.as_str())
.bind(table.observed_only)
.bind(&table.meta)
.execute(&mut *tx)
.await?;
}
}
tx.commit().await?;
Ok(run_id)
}
pub async fn prune_client_pressure_snapshots(
logging_pool: &PgPool,
retention_days: i64,
) -> Result<u64, sqlx::Error> {
let cutoff = Utc::now() - ChronoDuration::days(retention_days);
sqlx::query(
r#"
DELETE FROM client_pressure_snapshot_runs
WHERE recorded_at < $1
"#,
)
.bind(cutoff)
.execute(logging_pool)
.await
.map(|result| result.rows_affected())
}
pub async fn enqueue_client_pressure_backfill_request(
logging_pool: &PgPool,
hours_back: i32,
) -> Result<ClientPressureBackfillRequestRecord, sqlx::Error> {
sqlx::query_as(
r#"
INSERT INTO client_pressure_backfill_requests (
hours_back,
status,
formula_version,
meta
)
VALUES ($1, 'pending', $2, '{}'::jsonb)
RETURNING
id,
requested_at,
updated_at,
claimed_at,
completed_at,
hours_back,
status,
error_message,
formula_version,
meta
"#,
)
.bind(hours_back)
.bind(CLIENT_PRESSURE_FORMULA_VERSION)
.fetch_one(logging_pool)
.await
}
pub async fn claim_client_pressure_backfill_request(
logging_pool: &PgPool,
) -> Result<Option<ClientPressureBackfillRequestRecord>, sqlx::Error> {
sqlx::query_as(
r#"
WITH next_request AS (
SELECT id
FROM client_pressure_backfill_requests
WHERE status = 'pending'
ORDER BY requested_at
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE client_pressure_backfill_requests requests
SET
status = 'processing',
claimed_at = now(),
updated_at = now(),
error_message = NULL
FROM next_request
WHERE requests.id = next_request.id
RETURNING
requests.id,
requests.requested_at,
requests.updated_at,
requests.claimed_at,
requests.completed_at,
requests.hours_back,
requests.status,
requests.error_message,
requests.formula_version,
requests.meta
"#,
)
.fetch_optional(logging_pool)
.await
}
pub async fn finish_client_pressure_backfill_request(
logging_pool: &PgPool,
request_id: i64,
has_remaining_windows: bool,
) -> Result<(), sqlx::Error> {
let (status, completed_at, claimed_at): (&str, Option<DateTime<Utc>>, Option<DateTime<Utc>>) =
if has_remaining_windows {
("pending", None, None)
} else {
("completed", Some(Utc::now()), None)
};
sqlx::query(
r#"
UPDATE client_pressure_backfill_requests
SET
status = $2,
completed_at = $3,
claimed_at = $4,
updated_at = now(),
error_message = NULL
WHERE id = $1
"#,
)
.bind(request_id)
.bind(status)
.bind(completed_at)
.bind(claimed_at)
.execute(logging_pool)
.await?;
Ok(())
}
pub async fn fail_client_pressure_backfill_request(
logging_pool: &PgPool,
request_id: i64,
error_message: &str,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE client_pressure_backfill_requests
SET
status = 'failed',
completed_at = now(),
updated_at = now(),
error_message = $2
WHERE id = $1
"#,
)
.bind(request_id)
.bind(error_message)
.execute(logging_pool)
.await?;
Ok(())
}
pub async fn list_missing_completed_pressure_windows(
logging_pool: &PgPool,
start: DateTime<Utc>,
end: DateTime<Utc>,
limit: i64,
) -> Result<Vec<DateTime<Utc>>, sqlx::Error> {
if end <= start || limit <= 0 {
return Ok(Vec::new());
}
sqlx::query_scalar(
r#"
SELECT series.window_start
FROM generate_series($1, $2 - interval '1 hour', interval '1 hour') AS series(window_start)
WHERE NOT EXISTS (
SELECT 1
FROM client_pressure_snapshot_runs runs
WHERE runs.window_start = series.window_start
AND runs.status = 'completed'
)
ORDER BY series.window_start
LIMIT $3
"#,
)
.bind(start)
.bind(end)
.bind(limit)
.fetch_all(logging_pool)
.await
}
pub async fn get_latest_client_pressure_run(
logging_pool: &PgPool,
) -> Result<Option<ClientPressureSnapshotRunRecord>, sqlx::Error> {
sqlx::query_as(
r#"
SELECT
id,
recorded_at,
window_start,
window_end,
status,
skip_reason,
formula_version,
logging_saturation_ratio,
pool_timeout_like_failure,
collection_error,
meta
FROM client_pressure_snapshot_runs
ORDER BY recorded_at DESC, id DESC
LIMIT 1
"#,
)
.fetch_optional(logging_pool)
.await
}
pub async fn list_latest_client_pressure_summaries(
logging_pool: &PgPool,
) -> Result<Vec<ClientPressureSnapshotRecord>, sqlx::Error> {
sqlx::query_as(
r#"
SELECT DISTINCT ON (client_name)
id,
run_id,
recorded_at,
window_start,
window_end,
client_name,
request_count,
failed_requests,
cache_misses,
total_work_ms,
p50_duration_ms,
p95_duration_ms,
error_ratio,
cache_miss_ratio,
tail_ratio,
burst_ratio,
saturation_ratio,
dominant_table_share,
request_shock,
error_shock,
pressure_score,
volatility_score,
planner_score,
pressure_band,
placement_hint,
observed_only,
planner_enrichment_attempted,
planner_enrichment_applied,
planner_enrichment_skipped_reason,
meta
FROM client_pressure_snapshots
ORDER BY client_name, window_start DESC, id DESC
"#,
)
.fetch_all(logging_pool)
.await
}
pub async fn list_recent_client_pressure_history(
logging_pool: &PgPool,
client_name: &str,
limit: i64,
) -> Result<Vec<ClientPressureSnapshotRecord>, sqlx::Error> {
sqlx::query_as(
r#"
SELECT
id,
run_id,
recorded_at,
window_start,
window_end,
client_name,
request_count,
failed_requests,
cache_misses,
total_work_ms,
p50_duration_ms,
p95_duration_ms,
error_ratio,
cache_miss_ratio,
tail_ratio,
burst_ratio,
saturation_ratio,
dominant_table_share,
request_shock,
error_shock,
pressure_score,
volatility_score,
planner_score,
pressure_band,
placement_hint,
observed_only,
planner_enrichment_attempted,
planner_enrichment_applied,
planner_enrichment_skipped_reason,
meta
FROM client_pressure_snapshots
WHERE client_name = $1
ORDER BY window_start DESC, id DESC
LIMIT $2
"#,
)
.bind(client_name)
.bind(limit)
.fetch_all(logging_pool)
.await
}
pub async fn get_latest_client_pressure_detail(
logging_pool: &PgPool,
client_name: &str,
) -> Result<
Option<(
ClientPressureSnapshotRecord,
Vec<ClientTablePressureSnapshotRecord>,
)>,
sqlx::Error,
> {
let snapshot: Option<ClientPressureSnapshotRecord> = sqlx::query_as(
r#"
SELECT
id,
run_id,
recorded_at,
window_start,
window_end,
client_name,
request_count,
failed_requests,
cache_misses,
total_work_ms,
p50_duration_ms,
p95_duration_ms,
error_ratio,
cache_miss_ratio,
tail_ratio,
burst_ratio,
saturation_ratio,
dominant_table_share,
request_shock,
error_shock,
pressure_score,
volatility_score,
planner_score,
pressure_band,
placement_hint,
observed_only,
planner_enrichment_attempted,
planner_enrichment_applied,
planner_enrichment_skipped_reason,
meta
FROM client_pressure_snapshots
WHERE client_name = $1
ORDER BY window_start DESC, id DESC
LIMIT 1
"#,
)
.bind(client_name)
.fetch_optional(logging_pool)
.await?;
let Some(snapshot) = snapshot else {
return Ok(None);
};
let tables: Vec<ClientTablePressureSnapshotRecord> = sqlx::query_as(
r#"
SELECT
id,
run_id,
client_snapshot_id,
recorded_at,
window_start,
window_end,
client_name,
table_name,
request_count,
failed_requests,
cache_misses,
total_work_ms,
p50_duration_ms,
p95_duration_ms,
share_of_client_work,
error_ratio,
cache_miss_ratio,
tail_ratio,
burst_ratio,
request_shock,
error_shock,
pressure_score,
volatility_score,
pressure_band,
observed_only,
meta
FROM client_table_pressure_snapshots
WHERE client_snapshot_id = $1
ORDER BY pressure_score DESC NULLS LAST, total_work_ms DESC, table_name
"#,
)
.bind(snapshot.id)
.fetch_all(logging_pool)
.await?;
Ok(Some((snapshot, tables)))
}
pub async fn list_latest_client_load_snapshots(
logging_pool: &PgPool,
) -> Result<Vec<ClientLoadSnapshot>, sqlx::Error> {
sqlx::query_as(
r#"
SELECT DISTINCT ON (client_name)
recorded_at,
client_name,
pool_size,
idle_connections,
active_connections,
max_connections,
CASE
WHEN max_connections > 0
THEN active_connections::double precision / max_connections::double precision
ELSE NULL
END AS saturation_ratio
FROM client_connections
ORDER BY client_name, recorded_at DESC
"#,
)
.fetch_all(logging_pool)
.await
}
pub async fn get_latest_client_load_snapshot(
logging_pool: &PgPool,
client_name: &str,
) -> Result<Option<ClientLoadSnapshot>, sqlx::Error> {
sqlx::query_as(
r#"
SELECT
recorded_at,
client_name,
pool_size,
idle_connections,
active_connections,
max_connections,
CASE
WHEN max_connections > 0
THEN active_connections::double precision / max_connections::double precision
ELSE NULL
END AS saturation_ratio
FROM client_connections
WHERE client_name = $1
ORDER BY recorded_at DESC
LIMIT 1
"#,
)
.bind(client_name)
.fetch_optional(logging_pool)
.await
}
pub async fn load_client_pressure_window_rows(
logging_pool: &PgPool,
window_start: DateTime<Utc>,
window_end: DateTime<Utc>,
) -> Result<Vec<ClientPressureWindowRow>, sqlx::Error> {
sqlx::query_as(
r#"
WITH table_work AS (
SELECT
client AS client_name,
COALESCE(table_name, '<unknown>') AS table_name,
COALESCE(SUM(duration_ms), 0)::bigint AS table_work_ms
FROM gateway_operation_log
WHERE created_at >= $1
AND created_at < $2
AND client IS NOT NULL
GROUP BY client, COALESCE(table_name, '<unknown>')
),
dominant_table AS (
SELECT
client_name,
COALESCE(MAX(table_work_ms), 0)::bigint AS dominant_table_work_ms
FROM table_work
GROUP BY client_name
),
previous_requests AS (
SELECT
client AS client_name,
COUNT(*)::bigint AS request_count,
COUNT(*) FILTER (
WHERE COALESCE(status_code, 200) >= 400
)::bigint AS failed_requests
FROM gateway_request_log
WHERE created_at >= $1 - interval '1 hour'
AND created_at < $1
AND client IS NOT NULL
GROUP BY client
),
trailing_hourly AS (
SELECT
client_name,
percentile_cont(0.5) WITHIN GROUP (ORDER BY hourly_requests)::float8
AS trailing_24h_median_hourly_requests
FROM (
SELECT
client AS client_name,
date_trunc('hour', created_at) AS bucket_start,
COUNT(*)::bigint AS hourly_requests
FROM gateway_request_log
WHERE created_at >= $1 - interval '24 hours'
AND created_at < $1
AND client IS NOT NULL
GROUP BY client, date_trunc('hour', created_at)
) buckets
GROUP BY client_name
)
SELECT
requests.client AS client_name,
COUNT(*)::bigint AS request_count,
COUNT(*) FILTER (
WHERE COALESCE(requests.status_code, 200) >= 400
)::bigint AS failed_requests,
COUNT(*) FILTER (
WHERE requests.cached IS NOT TRUE
AND COALESCE(requests.cache_lookup_outcome, '') = 'miss'
)::bigint AS cache_misses,
COALESCE(SUM(requests.duration_ms), 0)::bigint AS total_work_ms,
percentile_cont(0.5) WITHIN GROUP (ORDER BY requests.duration_ms)::float8 AS p50_duration_ms,
percentile_cont(0.95) WITHIN GROUP (ORDER BY requests.duration_ms)::float8 AS p95_duration_ms,
trailing_hourly.trailing_24h_median_hourly_requests,
COALESCE(dominant_table.dominant_table_work_ms, 0)::bigint AS dominant_table_work_ms,
previous_requests.request_count AS previous_request_count,
previous_requests.failed_requests AS previous_failed_requests
FROM gateway_request_log requests
LEFT JOIN trailing_hourly
ON trailing_hourly.client_name = requests.client
LEFT JOIN dominant_table
ON dominant_table.client_name = requests.client
LEFT JOIN previous_requests
ON previous_requests.client_name = requests.client
WHERE requests.created_at >= $1
AND requests.created_at < $2
AND requests.client IS NOT NULL
GROUP BY
requests.client,
trailing_hourly.trailing_24h_median_hourly_requests,
dominant_table.dominant_table_work_ms,
previous_requests.request_count,
previous_requests.failed_requests
ORDER BY requests.client
"#,
)
.bind(window_start)
.bind(window_end)
.fetch_all(logging_pool)
.await
}
pub async fn load_client_table_pressure_window_rows(
logging_pool: &PgPool,
window_start: DateTime<Utc>,
window_end: DateTime<Utc>,
) -> Result<Vec<ClientTablePressureWindowRow>, sqlx::Error> {
sqlx::query_as(
r#"
WITH previous_operations AS (
SELECT
client AS client_name,
COALESCE(table_name, '<unknown>') AS table_name,
COUNT(*)::bigint AS request_count,
COUNT(*) FILTER (
WHERE COALESCE(status_code, 200) >= 400 OR COALESCE(error, FALSE)
)::bigint AS failed_requests
FROM gateway_operation_log
WHERE created_at >= $1 - interval '1 hour'
AND created_at < $1
AND client IS NOT NULL
GROUP BY client, COALESCE(table_name, '<unknown>')
),
trailing_hourly AS (
SELECT
client_name,
table_name,
percentile_cont(0.5) WITHIN GROUP (ORDER BY hourly_requests)::float8
AS trailing_24h_median_hourly_requests
FROM (
SELECT
client AS client_name,
COALESCE(table_name, '<unknown>') AS table_name,
date_trunc('hour', created_at) AS bucket_start,
COUNT(*)::bigint AS hourly_requests
FROM gateway_operation_log
WHERE created_at >= $1 - interval '24 hours'
AND created_at < $1
AND client IS NOT NULL
GROUP BY client, COALESCE(table_name, '<unknown>'), date_trunc('hour', created_at)
) buckets
GROUP BY client_name, table_name
)
SELECT
operations.client AS client_name,
COALESCE(operations.table_name, '<unknown>') AS table_name,
COUNT(*)::bigint AS request_count,
COUNT(*) FILTER (
WHERE COALESCE(operations.status_code, 200) >= 400 OR COALESCE(operations.error, FALSE)
)::bigint AS failed_requests,
COUNT(*) FILTER (
WHERE operations.cached IS NOT TRUE
AND COALESCE(operations.cache_lookup_outcome, '') = 'miss'
)::bigint AS cache_misses,
COALESCE(SUM(operations.duration_ms), 0)::bigint AS total_work_ms,
percentile_cont(0.5) WITHIN GROUP (ORDER BY operations.duration_ms)::float8 AS p50_duration_ms,
percentile_cont(0.95) WITHIN GROUP (ORDER BY operations.duration_ms)::float8 AS p95_duration_ms,
trailing_hourly.trailing_24h_median_hourly_requests,
previous_operations.request_count AS previous_request_count,
previous_operations.failed_requests AS previous_failed_requests
FROM gateway_operation_log operations
LEFT JOIN trailing_hourly
ON trailing_hourly.client_name = operations.client
AND trailing_hourly.table_name = COALESCE(operations.table_name, '<unknown>')
LEFT JOIN previous_operations
ON previous_operations.client_name = operations.client
AND previous_operations.table_name = COALESCE(operations.table_name, '<unknown>')
WHERE operations.created_at >= $1
AND operations.created_at < $2
AND operations.client IS NOT NULL
GROUP BY
operations.client,
COALESCE(operations.table_name, '<unknown>'),
trailing_hourly.trailing_24h_median_hourly_requests,
previous_operations.request_count,
previous_operations.failed_requests
ORDER BY operations.client, COALESCE(operations.table_name, '<unknown>')
"#,
)
.bind(window_start)
.bind(window_end)
.fetch_all(logging_pool)
.await
}
pub async fn load_pressure_sample_queries(
logging_pool: &PgPool,
client_name: &str,
window_start: DateTime<Utc>,
window_end: DateTime<Utc>,
limit: i64,
) -> Result<Vec<String>, sqlx::Error> {
let rows = sqlx::query(
r#"
SELECT query_text
FROM (
SELECT
details->>'query' AS query_text,
COALESCE(duration_ms, 0) AS duration_ms,
created_at
FROM gateway_operation_log
WHERE client = $1
AND operation = 'query'
AND created_at >= $2
AND created_at < $3
AND details ? 'query'
AND COALESCE(status_code, 200) < 500
ORDER BY duration_ms DESC, created_at DESC
LIMIT $4
) ranked
WHERE query_text IS NOT NULL
"#,
)
.bind(client_name)
.bind(window_start)
.bind(window_end)
.bind(limit)
.fetch_all(logging_pool)
.await?;
Ok(rows
.into_iter()
.filter_map(|row| {
row.try_get::<Option<String>, _>("query_text")
.ok()
.flatten()
})
.collect())
}
pub fn pressure_meta(
logging_saturation_ratio: Option<f64>,
planner_enrichment_skipped_reason: Option<&str>,
) -> Value {
json!({
"formula_version": CLIENT_PRESSURE_FORMULA_VERSION,
"logging_saturation_ratio": logging_saturation_ratio,
"planner_enrichment_skipped_reason": planner_enrichment_skipped_reason,
})
}
#[cfg(test)]
mod tests {
use super::*;
fn client_inputs(
pressure_count: i64,
failed_requests: i64,
cache_misses: i64,
dominant_table_share: f64,
) -> ClientPressureMetricInputs {
ClientPressureMetricInputs {
request_count: pressure_count,
failed_requests,
cache_misses,
p50_duration_ms: Some(100.0),
p95_duration_ms: Some(2200.0),
trailing_24h_median_hourly_requests: Some(25.0),
saturation_ratio: Some(0.90),
dominant_table_share: Some(dominant_table_share),
previous_request_count: Some(10),
previous_failed_requests: Some(0),
planner_score: None,
}
}
#[test]
fn client_pressure_uses_split_hot_table_for_red_concentrated_clients() {
let computation = compute_client_pressure(&client_inputs(100, 10, 50, 0.60));
assert_eq!(computation.pressure_band, ClientPressureBand::Red);
assert_eq!(
computation.placement_hint,
ClientPressurePlacementHint::SplitHotTable
);
}
#[test]
fn client_pressure_uses_move_client_for_red_distributed_clients() {
let computation = compute_client_pressure(&client_inputs(100, 10, 50, 0.20));
assert_eq!(computation.pressure_band, ClientPressureBand::Red);
assert_eq!(
computation.placement_hint,
ClientPressurePlacementHint::MoveClient
);
}
#[test]
fn table_pressure_band_thresholds_match_pressure_band_rules() {
let computation = compute_table_pressure(&ClientTablePressureMetricInputs {
request_count: 1,
failed_requests: 0,
cache_misses: 0,
p50_duration_ms: Some(10.0),
p95_duration_ms: Some(10.0),
trailing_24h_median_hourly_requests: Some(100.0),
share_of_client_work: Some(0.01),
previous_request_count: Some(1),
previous_failed_requests: Some(0),
});
assert_eq!(computation.pressure_band, ClientPressureBand::Green);
}
#[test]
fn planner_score_scales_logarithmically() {
let low = planner_score_from_total_cost(100.0);
let high = planner_score_from_total_cost(100_000.0);
assert!(high > low);
assert!(high <= 100.0);
}
#[test]
fn stale_load_snapshots_are_detected_from_interval_budget() {
let now = Utc::now();
let snapshot = ClientLoadSnapshot {
recorded_at: now - ChronoDuration::seconds(901),
client_name: "athena_logging".to_string(),
pool_size: 2,
idle_connections: 0,
active_connections: 2,
max_connections: 2,
saturation_ratio: Some(1.0),
};
assert!(is_load_snapshot_stale(&snapshot, 300, 3, now));
assert!(!is_load_snapshot_stale(&snapshot, 300, 4, now));
}
#[test]
fn planner_enrichment_requires_both_logging_and_client_headroom() {
assert!(planner_enrichment_allowed(Some(0.10), Some(0.20)));
assert!(!planner_enrichment_allowed(Some(0.60), Some(0.20)));
assert!(!planner_enrichment_allowed(Some(0.10), Some(0.60)));
}
}