use chrono::{DateTime, Utc};
use serde::Serialize;
use serde_json::{Value, json};
use sqlx::Row;
use sqlx::postgres::{PgPool, PgRow};
use sqlx::types::Json;
#[derive(Debug, Clone, Serialize)]
pub struct QueryOptimizationRecommendationRecord {
pub id: i64,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub client_name: String,
pub recommendation_key: String,
pub title: String,
pub recommendation_type: String,
pub severity: String,
pub status: String,
pub confidence: Option<f64>,
pub impact_score: Option<f64>,
pub rationale: Option<String>,
pub proposed_sql: Option<String>,
pub explain_plan: Option<Value>,
pub metadata: Value,
pub first_seen_at: DateTime<Utc>,
pub last_seen_at: DateTime<Utc>,
pub applied_at: Option<DateTime<Utc>>,
pub applied_by: Option<String>,
pub apply_error: Option<String>,
pub run_id: Option<i64>,
}
#[derive(Debug, Clone, Serialize)]
pub struct QueryOptimizationRunRecord {
pub id: i64,
pub created_at: DateTime<Utc>,
pub started_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
pub client_name: String,
pub trigger_source: String,
pub status: String,
pub recommendations_generated: i32,
pub error_message: Option<String>,
pub meta: Value,
}
#[derive(Debug, Clone, Serialize)]
pub struct QueryOptimizationActionRecord {
pub id: i64,
pub created_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
pub recommendation_id: i64,
pub client_name: String,
pub action: String,
pub status: String,
pub actor: Option<String>,
pub executed_sql: Option<String>,
pub error_message: Option<String>,
pub meta: Value,
}
#[derive(Debug, Clone, Serialize)]
pub struct QueryOptimizationApplyResult {
pub recommendation: QueryOptimizationRecommendationRecord,
pub action: QueryOptimizationActionRecord,
}
struct IndexCandidate {
recommendation_key: &'static str,
index_name: &'static str,
table_name: &'static str,
title: &'static str,
base_severity: &'static str,
base_confidence: f64,
base_impact_score: f64,
rationale: &'static str,
proposed_sql: &'static str,
probe_sql: &'static str,
}
const GATEWAY_REQUEST_LOG_CANDIDATES: &[IndexCandidate] = &[
IndexCandidate {
recommendation_key: "idx:public.gateway_request_log:request_id",
index_name: "idx_gateway_request_log_request_id",
table_name: "gateway_request_log",
title: "Add index on gateway_request_log.request_id",
base_severity: "critical",
base_confidence: 0.96,
base_impact_score: 0.88,
rationale: "Request log completion updates filter by request_id. Without an index this can become a sequential scan under load.",
proposed_sql: "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_gateway_request_log_request_id ON public.gateway_request_log (request_id)",
probe_sql: "UPDATE public.gateway_request_log SET duration_ms = 1 WHERE request_id = 'athena_probe_request_id'",
},
IndexCandidate {
recommendation_key: "idx:public.gateway_request_log:created_at_desc",
index_name: "idx_gateway_request_log_created_at",
table_name: "gateway_request_log",
title: "Add recency index on gateway_request_log.created_at",
base_severity: "high",
base_confidence: 0.88,
base_impact_score: 0.72,
rationale: "Operators and dashboards frequently read recent request logs by created_at DESC.",
proposed_sql: "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_gateway_request_log_created_at ON public.gateway_request_log (created_at DESC)",
probe_sql: "SELECT id FROM public.gateway_request_log ORDER BY created_at DESC LIMIT 250",
},
IndexCandidate {
recommendation_key: "idx:public.gateway_request_log:client_created_at_desc",
index_name: "idx_gateway_request_log_client_created_at",
table_name: "gateway_request_log",
title: "Add tenant+recency index on gateway_request_log(client, created_at)",
base_severity: "high",
base_confidence: 0.86,
base_impact_score: 0.69,
rationale: "Client-scoped diagnostics often combine client filtering with recency ordering.",
proposed_sql: "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_gateway_request_log_client_created_at ON public.gateway_request_log (client, created_at DESC) WHERE client IS NOT NULL",
probe_sql: "SELECT id FROM public.gateway_request_log WHERE client = 'athena_probe_client' ORDER BY created_at DESC LIMIT 250",
},
];
#[derive(Debug, Clone, Default)]
struct GatewayRequestLogStats {
total_requests_24h: i64,
slow_requests_24h: i64,
p95_duration_ms: Option<f64>,
max_duration_ms: Option<i64>,
estimated_row_count: Option<f64>,
}
impl GatewayRequestLogStats {
fn to_json(&self) -> Value {
json!({
"total_requests_24h": self.total_requests_24h,
"slow_requests_24h": self.slow_requests_24h,
"p95_duration_ms": self.p95_duration_ms,
"max_duration_ms": self.max_duration_ms,
"estimated_row_count": self.estimated_row_count,
})
}
}
#[derive(Debug, Clone, Default)]
struct ExplainPlanSummary {
root_node_type: Option<String>,
total_cost: Option<f64>,
plan_rows: Option<f64>,
max_total_cost: Option<f64>,
max_plan_rows: Option<f64>,
has_seq_scan: bool,
has_sort: bool,
has_index_scan: bool,
has_bitmap_scan: bool,
relation_names: Vec<String>,
}
impl ExplainPlanSummary {
fn to_json(&self) -> Value {
json!({
"root_node_type": self.root_node_type,
"total_cost": self.total_cost,
"plan_rows": self.plan_rows,
"max_total_cost": self.max_total_cost,
"max_plan_rows": self.max_plan_rows,
"has_seq_scan": self.has_seq_scan,
"has_sort": self.has_sort,
"has_index_scan": self.has_index_scan,
"has_bitmap_scan": self.has_bitmap_scan,
"relation_names": self.relation_names,
})
}
}
fn push_unique_relation(summary: &mut ExplainPlanSummary, relation_name: &str) {
if summary
.relation_names
.iter()
.any(|existing| existing == relation_name)
{
return;
}
summary.relation_names.push(relation_name.to_string());
}
fn collect_plan_stats(node: &Value, summary: &mut ExplainPlanSummary) {
if let Some(node_type) = node.get("Node Type").and_then(Value::as_str) {
match node_type {
"Seq Scan" => summary.has_seq_scan = true,
"Sort" => summary.has_sort = true,
"Index Scan" | "Index Only Scan" => summary.has_index_scan = true,
"Bitmap Heap Scan" | "Bitmap Index Scan" => summary.has_bitmap_scan = true,
_ => {}
}
}
if let Some(cost) = node.get("Total Cost").and_then(Value::as_f64) {
summary.max_total_cost = Some(
summary
.max_total_cost
.map_or(cost, |current| current.max(cost)),
);
}
if let Some(plan_rows) = node.get("Plan Rows").and_then(Value::as_f64) {
summary.max_plan_rows = Some(
summary
.max_plan_rows
.map_or(plan_rows, |current| current.max(plan_rows)),
);
}
if let Some(relation_name) = node.get("Relation Name").and_then(Value::as_str) {
push_unique_relation(summary, relation_name);
}
if let Some(children) = node.get("Plans").and_then(Value::as_array) {
for child in children {
collect_plan_stats(child, summary);
}
}
}
fn summarize_explain_plan(explain_plan: &Value) -> Option<ExplainPlanSummary> {
let root = explain_plan
.as_array()
.and_then(|plans| plans.first())
.and_then(|entry| entry.get("Plan"))?;
let mut summary = ExplainPlanSummary {
root_node_type: root
.get("Node Type")
.and_then(Value::as_str)
.map(str::to_string),
total_cost: root.get("Total Cost").and_then(Value::as_f64),
plan_rows: root.get("Plan Rows").and_then(Value::as_f64),
..ExplainPlanSummary::default()
};
collect_plan_stats(root, &mut summary);
if summary.total_cost.is_none() {
summary.total_cost = summary.max_total_cost;
}
if summary.plan_rows.is_none() {
summary.plan_rows = summary.max_plan_rows;
}
Some(summary)
}
fn severity_rank(severity: &str) -> i32 {
match severity {
"critical" => 4,
"high" => 3,
"medium" => 2,
"low" => 1,
_ => 1,
}
}
fn max_severity(base: &str, computed: &str) -> String {
if severity_rank(base) >= severity_rank(computed) {
base.to_string()
} else {
computed.to_string()
}
}
fn severity_from_impact(score: f64) -> &'static str {
if score >= 0.90 {
"critical"
} else if score >= 0.72 {
"high"
} else if score >= 0.45 {
"medium"
} else {
"low"
}
}
fn clamp_unit(value: f64) -> f64 {
value.clamp(0.0, 1.0)
}
fn rank_candidate(
candidate: &IndexCandidate,
explain_summary: Option<&ExplainPlanSummary>,
stats: &GatewayRequestLogStats,
) -> (String, f64, f64, String, Value) {
let mut confidence: f64 = candidate.base_confidence;
let mut impact_score: f64 = candidate.base_impact_score;
let mut notes: Vec<String> = Vec::new();
if let Some(summary) = explain_summary {
if summary.has_seq_scan {
impact_score += 0.16;
confidence += 0.06;
notes.push("EXPLAIN includes a sequential scan on the probe path.".to_string());
}
if summary.has_sort && !summary.has_index_scan {
impact_score += 0.08;
notes.push("EXPLAIN includes a sort without index-backed ordering.".to_string());
}
if summary.has_index_scan || summary.has_bitmap_scan {
impact_score -= 0.05;
confidence -= 0.03;
notes.push(
"EXPLAIN already reports index-assisted access for part of the probe.".to_string(),
);
}
if let Some(total_cost) = summary.max_total_cost.or(summary.total_cost) {
if total_cost >= 100_000.0 {
impact_score += 0.16;
notes.push(format!(
"Estimated planner total cost is high ({total_cost:.0})."
));
} else if total_cost >= 10_000.0 {
impact_score += 0.08;
notes.push(format!(
"Estimated planner total cost is elevated ({total_cost:.0})."
));
}
}
if let Some(plan_rows) = summary.max_plan_rows.or(summary.plan_rows) {
if plan_rows >= 1_000_000.0 {
impact_score += 0.12;
notes.push(format!(
"Planner estimates very large row scans ({plan_rows:.0} rows)."
));
} else if plan_rows >= 50_000.0 {
impact_score += 0.06;
notes.push(format!(
"Planner estimates broad scans ({plan_rows:.0} rows)."
));
}
}
} else {
confidence -= 0.08;
notes.push(
"EXPLAIN plan could not be collected; ranking falls back to heuristic scoring."
.to_string(),
);
}
if let Some(p95_duration_ms) = stats.p95_duration_ms {
if p95_duration_ms >= 20_000.0 {
impact_score += 0.12;
notes.push(format!(
"Observed p95 request duration is severe ({p95_duration_ms:.0} ms)."
));
} else if p95_duration_ms >= 5_000.0 {
impact_score += 0.07;
notes.push(format!(
"Observed p95 request duration is elevated ({p95_duration_ms:.0} ms)."
));
}
}
if stats.slow_requests_24h >= 1_000 {
impact_score += 0.12;
notes.push(format!(
"Detected {} slow requests in the last 24h.",
stats.slow_requests_24h
));
} else if stats.slow_requests_24h >= 100 {
impact_score += 0.07;
notes.push(format!(
"Detected {} slow requests in the last 24h.",
stats.slow_requests_24h
));
}
if let Some(estimated_rows) = stats.estimated_row_count {
if estimated_rows >= 1_000_000.0 {
impact_score += 0.10;
notes.push(format!(
"gateway_request_log table estimate is high ({estimated_rows:.0} rows)."
));
} else if estimated_rows >= 100_000.0 {
impact_score += 0.05;
notes.push(format!(
"gateway_request_log table estimate is significant ({estimated_rows:.0} rows)."
));
}
}
confidence = clamp_unit(confidence);
impact_score = clamp_unit(impact_score);
let computed_severity = severity_from_impact(impact_score);
let severity = max_severity(candidate.base_severity, computed_severity);
let rationale = if notes.is_empty() {
candidate.rationale.to_string()
} else {
format!("{} {}", candidate.rationale, notes.join(" "))
};
let ranking_meta = json!({
"strategy": "heuristic_plus_explain",
"computed_severity": computed_severity,
"base_severity": candidate.base_severity,
"confidence": confidence,
"impact_score": impact_score,
"notes": notes,
});
(severity, confidence, impact_score, rationale, ranking_meta)
}
async fn explain_probe_query(pool: &PgPool, probe_sql: &str) -> Result<Option<Value>, sqlx::Error> {
let explain_sql = format!("EXPLAIN (FORMAT JSON) {probe_sql}");
let row = sqlx::query(&explain_sql).fetch_optional(pool).await?;
let Some(row) = row else {
return Ok(None);
};
let plan: Json<Value> = row.try_get(0)?;
Ok(Some(plan.0))
}
async fn load_gateway_request_log_stats(
pool: &PgPool,
client_name: &str,
) -> GatewayRequestLogStats {
let mut stats = GatewayRequestLogStats::default();
if let Ok(row) = sqlx::query(
r#"
SELECT
COUNT(*) FILTER (WHERE created_at >= now() - interval '24 hours') AS total_requests_24h,
COUNT(*) FILTER (
WHERE created_at >= now() - interval '24 hours'
AND duration_ms IS NOT NULL
AND duration_ms >= 1000
) AS slow_requests_24h,
percentile_cont(0.95) WITHIN GROUP (ORDER BY duration_ms)
FILTER (
WHERE created_at >= now() - interval '24 hours'
AND duration_ms IS NOT NULL
) AS p95_duration_ms,
MAX(duration_ms) FILTER (WHERE created_at >= now() - interval '24 hours') AS max_duration_ms
FROM gateway_request_log
WHERE lower(client) = lower($1)
"#,
)
.bind(client_name)
.fetch_one(pool)
.await
{
stats.total_requests_24h = row.try_get("total_requests_24h").unwrap_or(0);
stats.slow_requests_24h = row.try_get("slow_requests_24h").unwrap_or(0);
stats.p95_duration_ms = row.try_get("p95_duration_ms").ok();
stats.max_duration_ms = row.try_get("max_duration_ms").ok();
}
if let Ok(row) = sqlx::query(
r#"
SELECT c.reltuples::double precision AS estimated_row_count
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'public'
AND c.relname = 'gateway_request_log'
LIMIT 1
"#,
)
.fetch_one(pool)
.await
{
stats.estimated_row_count = row.try_get("estimated_row_count").ok();
}
stats
}
fn map_recommendation_row(
row: &PgRow,
) -> Result<QueryOptimizationRecommendationRecord, sqlx::Error> {
Ok(QueryOptimizationRecommendationRecord {
id: row.try_get("id")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
client_name: row.try_get("client_name")?,
recommendation_key: row.try_get("recommendation_key")?,
title: row.try_get("title")?,
recommendation_type: row.try_get("recommendation_type")?,
severity: row.try_get("severity")?,
status: row.try_get("status")?,
confidence: row.try_get("confidence")?,
impact_score: row.try_get("impact_score")?,
rationale: row.try_get("rationale")?,
proposed_sql: row.try_get("proposed_sql")?,
explain_plan: row.try_get("explain_plan")?,
metadata: row.try_get("metadata")?,
first_seen_at: row.try_get("first_seen_at")?,
last_seen_at: row.try_get("last_seen_at")?,
applied_at: row.try_get("applied_at")?,
applied_by: row.try_get("applied_by")?,
apply_error: row.try_get("apply_error")?,
run_id: row.try_get("run_id")?,
})
}
fn map_run_row(row: &PgRow) -> Result<QueryOptimizationRunRecord, sqlx::Error> {
Ok(QueryOptimizationRunRecord {
id: row.try_get("id")?,
created_at: row.try_get("created_at")?,
started_at: row.try_get("started_at")?,
completed_at: row.try_get("completed_at")?,
client_name: row.try_get("client_name")?,
trigger_source: row.try_get("trigger_source")?,
status: row.try_get("status")?,
recommendations_generated: row.try_get("recommendations_generated")?,
error_message: row.try_get("error_message")?,
meta: row.try_get("meta")?,
})
}
fn map_action_row(row: &PgRow) -> Result<QueryOptimizationActionRecord, sqlx::Error> {
Ok(QueryOptimizationActionRecord {
id: row.try_get("id")?,
created_at: row.try_get("created_at")?,
completed_at: row.try_get("completed_at")?,
recommendation_id: row.try_get("recommendation_id")?,
client_name: row.try_get("client_name")?,
action: row.try_get("action")?,
status: row.try_get("status")?,
actor: row.try_get("actor")?,
executed_sql: row.try_get("executed_sql")?,
error_message: row.try_get("error_message")?,
meta: row.try_get("meta")?,
})
}
async fn index_exists(
pool: &PgPool,
table_name: &str,
index_name: &str,
) -> Result<bool, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT 1
FROM pg_indexes
WHERE schemaname = 'public'
AND tablename = $1
AND indexname = $2
LIMIT 1
"#,
)
.bind(table_name)
.bind(index_name)
.fetch_optional(pool)
.await?;
Ok(row.is_some())
}
fn normalize_sql_for_safety(statement: &str) -> String {
statement
.split_whitespace()
.map(str::to_ascii_lowercase)
.collect::<Vec<String>>()
.join(" ")
}
fn is_safe_apply_statement(statement: &str) -> bool {
let normalized = normalize_sql_for_safety(statement);
if normalized.is_empty() || normalized.contains(';') {
return false;
}
let starts_like_index_create = normalized.starts_with("create index concurrent")
|| normalized.starts_with("create index concurrently")
|| normalized.starts_with("create unique index concurrent")
|| normalized.starts_with("create unique index concurrently");
starts_like_index_create
&& normalized.contains(" if not exists ")
&& (normalized.contains(" on public.gateway_request_log ")
|| normalized.contains(" on gateway_request_log "))
}
pub async fn list_query_optimization_recommendations(
pool: &PgPool,
client_name: &str,
status: Option<&str>,
limit: i64,
offset: i64,
) -> Result<Vec<QueryOptimizationRecommendationRecord>, sqlx::Error> {
let rows = sqlx::query(
r#"
SELECT
id,
created_at,
updated_at,
client_name,
recommendation_key,
title,
recommendation_type,
severity,
status,
confidence,
impact_score,
rationale,
proposed_sql,
explain_plan,
metadata,
first_seen_at,
last_seen_at,
applied_at,
applied_by,
apply_error,
run_id
FROM query_optimization_recommendations
WHERE lower(client_name) = lower($1)
AND ($2::text IS NULL OR status = $2)
ORDER BY
CASE severity
WHEN 'critical' THEN 1
WHEN 'high' THEN 2
WHEN 'medium' THEN 3
ELSE 4
END,
updated_at DESC
LIMIT $3
OFFSET $4
"#,
)
.bind(client_name)
.bind(status)
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await?;
rows.iter().map(map_recommendation_row).collect()
}
pub async fn list_query_optimization_runs(
pool: &PgPool,
client_name: &str,
limit: i64,
offset: i64,
) -> Result<Vec<QueryOptimizationRunRecord>, sqlx::Error> {
let rows = sqlx::query(
r#"
SELECT
id,
created_at,
started_at,
completed_at,
client_name,
trigger_source,
status,
recommendations_generated,
error_message,
meta
FROM query_optimization_runs
WHERE lower(client_name) = lower($1)
ORDER BY created_at DESC
LIMIT $2
OFFSET $3
"#,
)
.bind(client_name)
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await?;
rows.iter().map(map_run_row).collect()
}
pub async fn refresh_query_optimization_recommendations(
pool: &PgPool,
client_name: &str,
trigger_source: &str,
actor: Option<&str>,
) -> Result<
(
QueryOptimizationRunRecord,
Vec<QueryOptimizationRecommendationRecord>,
),
sqlx::Error,
> {
let gateway_log_stats = load_gateway_request_log_stats(pool, client_name).await;
let run_meta = json!({
"actor": actor,
"source": "heuristic_plus_explain",
"gateway_request_log_stats": gateway_log_stats.to_json(),
});
let run_row = sqlx::query(
r#"
INSERT INTO query_optimization_runs (
client_name,
trigger_source,
status,
meta,
started_at
)
VALUES ($1, $2, 'running', $3, now())
RETURNING
id,
created_at,
started_at,
completed_at,
client_name,
trigger_source,
status,
recommendations_generated,
error_message,
meta
"#,
)
.bind(client_name)
.bind(trigger_source)
.bind(run_meta)
.fetch_one(pool)
.await?;
let run_id: i64 = run_row.try_get("id")?;
let generated = async {
let mut recommendations = Vec::new();
for candidate in GATEWAY_REQUEST_LOG_CANDIDATES {
if index_exists(pool, candidate.table_name, candidate.index_name).await? {
continue;
}
let explain_plan = explain_probe_query(pool, candidate.probe_sql).await?;
let explain_summary = explain_plan.as_ref().and_then(summarize_explain_plan);
let (severity, confidence, impact_score, rationale, ranking_meta) =
rank_candidate(candidate, explain_summary.as_ref(), &gateway_log_stats);
let metadata = json!({
"source": "heuristic_plus_explain",
"table_name": candidate.table_name,
"index_name": candidate.index_name,
"probe_sql": candidate.probe_sql,
"plan_summary": explain_summary.map(|summary| summary.to_json()),
"gateway_request_log_stats": gateway_log_stats.to_json(),
"ranking": ranking_meta,
});
let explain_plan_param: Option<Json<Value>> = explain_plan.clone().map(Json);
let row = sqlx::query(
r#"
INSERT INTO query_optimization_recommendations (
client_name,
recommendation_key,
title,
recommendation_type,
severity,
status,
confidence,
impact_score,
rationale,
proposed_sql,
explain_plan,
metadata,
first_seen_at,
last_seen_at,
run_id
)
VALUES (
$1,
$2,
$3,
'index',
$4,
'open',
$5,
$6,
$7,
$8,
$9,
$10,
now(),
now(),
$11
)
ON CONFLICT (client_name, recommendation_key)
DO UPDATE SET
client_name = EXCLUDED.client_name,
title = EXCLUDED.title,
severity = EXCLUDED.severity,
confidence = EXCLUDED.confidence,
impact_score = EXCLUDED.impact_score,
rationale = EXCLUDED.rationale,
proposed_sql = EXCLUDED.proposed_sql,
explain_plan = EXCLUDED.explain_plan,
metadata = EXCLUDED.metadata,
run_id = EXCLUDED.run_id,
last_seen_at = now(),
updated_at = now(),
status = CASE
WHEN query_optimization_recommendations.status = 'applied' THEN 'applied'
ELSE 'open'
END,
apply_error = NULL
RETURNING
id,
created_at,
updated_at,
client_name,
recommendation_key,
title,
recommendation_type,
severity,
status,
confidence,
impact_score,
rationale,
proposed_sql,
explain_plan,
metadata,
first_seen_at,
last_seen_at,
applied_at,
applied_by,
apply_error,
run_id
"#,
)
.bind(client_name)
.bind(candidate.recommendation_key)
.bind(candidate.title)
.bind(severity)
.bind(confidence)
.bind(impact_score)
.bind(rationale)
.bind(candidate.proposed_sql)
.bind(explain_plan_param)
.bind(metadata)
.bind(run_id)
.fetch_one(pool)
.await?;
recommendations.push(map_recommendation_row(&row)?);
}
Ok::<Vec<QueryOptimizationRecommendationRecord>, sqlx::Error>(recommendations)
}
.await;
match generated {
Ok(recommendations) => {
let completed = sqlx::query(
r#"
UPDATE query_optimization_runs
SET status = 'completed',
recommendations_generated = $2,
completed_at = now()
WHERE id = $1
RETURNING
id,
created_at,
started_at,
completed_at,
client_name,
trigger_source,
status,
recommendations_generated,
error_message,
meta
"#,
)
.bind(run_id)
.bind(recommendations.len() as i32)
.fetch_one(pool)
.await?;
Ok((map_run_row(&completed)?, recommendations))
}
Err(err) => {
let _ = sqlx::query(
r#"
UPDATE query_optimization_runs
SET status = 'failed',
error_message = $2,
completed_at = now()
WHERE id = $1
"#,
)
.bind(run_id)
.bind(err.to_string())
.execute(pool)
.await;
Err(err)
}
}
}
pub async fn apply_query_optimization_recommendation(
pool: &PgPool,
client_name: &str,
recommendation_id: i64,
actor: Option<&str>,
) -> Result<Option<QueryOptimizationApplyResult>, sqlx::Error> {
let recommendation_row = sqlx::query(
r#"
SELECT
id,
created_at,
updated_at,
client_name,
recommendation_key,
title,
recommendation_type,
severity,
status,
confidence,
impact_score,
rationale,
proposed_sql,
explain_plan,
metadata,
first_seen_at,
last_seen_at,
applied_at,
applied_by,
apply_error,
run_id
FROM query_optimization_recommendations
WHERE id = $1
AND lower(client_name) = lower($2)
LIMIT 1
"#,
)
.bind(recommendation_id)
.bind(client_name)
.fetch_optional(pool)
.await?;
let Some(recommendation_row) = recommendation_row else {
return Ok(None);
};
let recommendation = map_recommendation_row(&recommendation_row)?;
let executed_sql = recommendation.proposed_sql.clone();
let action_row = sqlx::query(
r#"
INSERT INTO query_optimization_actions (
recommendation_id,
client_name,
action,
status,
actor,
executed_sql,
meta
)
VALUES ($1, $2, 'apply', 'running', $3, $4, $5)
RETURNING
id,
created_at,
completed_at,
recommendation_id,
client_name,
action,
status,
actor,
executed_sql,
error_message,
meta
"#,
)
.bind(recommendation.id)
.bind(client_name)
.bind(actor)
.bind(executed_sql.clone())
.bind(json!({ "requested_by": actor }))
.fetch_one(pool)
.await?;
let action_id: i64 = action_row.try_get("id")?;
let apply_result: Result<(), String> = match recommendation.status.as_str() {
"applied" => Ok(()),
_ => match executed_sql.clone() {
None => Err("Recommendation does not include executable SQL.".to_string()),
Some(sql) if !is_safe_apply_statement(&sql) => Err(
"Refusing to execute SQL that is outside safe index-creation guardrails."
.to_string(),
),
Some(sql) => sqlx::query(&sql)
.execute(pool)
.await
.map(|_| ())
.map_err(|err| err.to_string()),
},
};
let final_action_row = match apply_result {
Ok(()) => {
sqlx::query(
r#"
UPDATE query_optimization_recommendations
SET status = 'applied',
applied_at = now(),
applied_by = $2,
apply_error = NULL,
updated_at = now()
WHERE id = $1
"#,
)
.bind(recommendation.id)
.bind(actor)
.execute(pool)
.await?;
sqlx::query(
r#"
UPDATE query_optimization_actions
SET status = CASE
WHEN $2 = 'applied' THEN 'skipped'
ELSE 'success'
END,
completed_at = now(),
error_message = NULL
WHERE id = $1
RETURNING
id,
created_at,
completed_at,
recommendation_id,
client_name,
action,
status,
actor,
executed_sql,
error_message,
meta
"#,
)
.bind(action_id)
.bind(recommendation.status)
.fetch_one(pool)
.await?
}
Err(error_message) => {
sqlx::query(
r#"
UPDATE query_optimization_recommendations
SET apply_error = $2,
updated_at = now()
WHERE id = $1
"#,
)
.bind(recommendation.id)
.bind(&error_message)
.execute(pool)
.await?;
sqlx::query(
r#"
UPDATE query_optimization_actions
SET status = 'failed',
completed_at = now(),
error_message = $2
WHERE id = $1
RETURNING
id,
created_at,
completed_at,
recommendation_id,
client_name,
action,
status,
actor,
executed_sql,
error_message,
meta
"#,
)
.bind(action_id)
.bind(error_message)
.fetch_one(pool)
.await?
}
};
let refreshed_recommendation_row = sqlx::query(
r#"
SELECT
id,
created_at,
updated_at,
client_name,
recommendation_key,
title,
recommendation_type,
severity,
status,
confidence,
impact_score,
rationale,
proposed_sql,
explain_plan,
metadata,
first_seen_at,
last_seen_at,
applied_at,
applied_by,
apply_error,
run_id
FROM query_optimization_recommendations
WHERE id = $1
"#,
)
.bind(recommendation.id)
.fetch_one(pool)
.await?;
Ok(Some(QueryOptimizationApplyResult {
recommendation: map_recommendation_row(&refreshed_recommendation_row)?,
action: map_action_row(&final_action_row)?,
}))
}