#![allow(clippy::disallowed_methods)]
use sqlx::PgPool;
use tracing::{debug, error, info};
pub async fn ensure_partitions(pool: &PgPool) {
let results = tokio::join!(
sqlx::query(
"SELECT forge_signals_ensure_partition(date_trunc('month', CURRENT_DATE)::date)",
)
.execute(pool),
sqlx::query(
"SELECT forge_signals_ensure_partition((date_trunc('month', CURRENT_DATE) + interval '1 month')::date)",
)
.execute(pool),
sqlx::query(
"SELECT forge_signals_ensure_partition((date_trunc('month', CURRENT_DATE) + interval '2 months')::date)",
)
.execute(pool),
sqlx::query(
"SELECT forge_signals_ensure_partition((date_trunc('month', CURRENT_DATE) + interval '3 months')::date)",
)
.execute(pool),
);
let labels = ["current", "+1 month", "+2 months", "+3 months"];
for (result, label) in [results.0, results.1, results.2, results.3]
.into_iter()
.zip(labels)
{
if let Err(e) = result {
error!(error = %e, month = label, "failed to ensure signal partition");
}
}
debug!("signal partitions verified (current + 3 months ahead)");
}
pub async fn drop_old_partitions(pool: &PgPool, retention_days: u32) {
let result = sqlx::query_scalar::<_, i32>("SELECT forge_signals_drop_old_partitions($1)")
.bind(retention_days as i32)
.fetch_one(pool)
.await;
match result {
Ok(dropped) if dropped > 0 => {
info!(dropped, retention_days, "dropped old signal partitions");
}
Ok(_) => debug!(retention_days, "no old signal partitions to drop"),
Err(e) => error!(error = %e, "failed to drop old signal partitions"),
}
}
pub async fn check_default_partition(pool: &PgPool) {
let result = sqlx::query_scalar::<_, i64>("SELECT count(*) FROM forge_signals_events_default")
.fetch_one(pool)
.await;
match result {
Ok(0) => debug!("signals default partition empty"),
Ok(count) => error!(
misrouted_rows = count,
"signals: rows landed in forge_signals_events_default — a partition is missing for some range. \
These rows are excluded from retention drops; investigate the partition coverage."
),
Err(e) => error!(error = %e, "failed to inspect signals default partition"),
}
}