use sqlx::{PgPool, Row};
use ff_core::engine_error::EngineError;
use crate::budget;
use crate::error::map_sqlx_error;
use crate::reconcilers::ScanReport;
const BATCH_SIZE: i64 = 20;
pub async fn scan_tick(pool: &PgPool, partition_key: i16) -> Result<ScanReport, EngineError> {
let now_ms: i64 = i64::try_from(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0),
)
.unwrap_or(i64::MAX);
let rows = sqlx::query(
r#"
SELECT budget_id
FROM ff_budget_policy
WHERE partition_key = $1
AND next_reset_at_ms IS NOT NULL
AND next_reset_at_ms <= $2
ORDER BY next_reset_at_ms ASC
LIMIT $3
"#,
)
.bind(partition_key)
.bind(now_ms)
.bind(BATCH_SIZE)
.fetch_all(pool)
.await
.map_err(map_sqlx_error)?;
let mut report = ScanReport::default();
for row in rows {
let budget_id: String = row.try_get("budget_id").map_err(map_sqlx_error)?;
match budget::budget_reset_reconciler_apply(pool, partition_key, &budget_id, now_ms).await {
Ok(()) => report.processed += 1,
Err(e) => {
tracing::warn!(
partition = partition_key,
budget_id = %budget_id,
error = %e,
"budget_reset reconciler: reset failed",
);
report.errors += 1;
}
}
}
Ok(report)
}