use sqlx::{Row, SqlitePool};
use ff_core::engine_error::EngineError;
use crate::budget;
use crate::errors::map_sqlx_error;
use crate::reconcilers::ScanReport;
const BATCH_SIZE: i64 = 20;
const PART: i64 = 0;
pub async fn scan_tick(pool: &SqlitePool, now_ms: i64) -> Result<ScanReport, EngineError> {
let rows = sqlx::query(
"SELECT budget_id \
FROM ff_budget_policy \
WHERE partition_key = ? \
AND next_reset_at_ms IS NOT NULL \
AND next_reset_at_ms <= ? \
ORDER BY next_reset_at_ms ASC \
LIMIT ?",
)
.bind(PART)
.bind(now_ms)
.bind(BATCH_SIZE)
.fetch_all(pool)
.await
.map_err(map_sqlx_error)?;
let mut report = ScanReport::default();
for row in rows {
let budget_id: String = row.try_get("budget_id").map_err(map_sqlx_error)?;
match budget::budget_reset_reconciler_apply(pool, &budget_id, now_ms).await {
Ok(()) => report.processed += 1,
Err(e) => {
tracing::warn!(
budget_id = %budget_id,
error = %e,
"sqlite budget_reset reconciler: reset failed",
);
report.errors += 1;
}
}
}
Ok(report)
}