1use awa_model::cron::{atomic_enqueue, list_cron_jobs, upsert_cron_job, CronJobRow};
2use awa_model::{JobRow, PeriodicJob};
3use chrono::Utc;
4use croner::Cron;
5use sqlx::pool::PoolConnection;
6use sqlx::{PgPool, Postgres};
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::RwLock;
12use tokio_util::sync::CancellationToken;
13use tracing::{debug, error, info, warn};
14
15pub struct MaintenanceService {
20 pool: PgPool,
21 cancel: CancellationToken,
22 leader: Arc<AtomicBool>,
23 periodic_jobs: Arc<Vec<PeriodicJob>>,
24 in_flight: Arc<RwLock<HashMap<i64, Arc<AtomicBool>>>>,
27 heartbeat_rescue_interval: Duration,
28 deadline_rescue_interval: Duration,
29 promote_interval: Duration,
30 cleanup_interval: Duration,
31 cron_sync_interval: Duration,
32 cron_eval_interval: Duration,
33 leader_check_interval: Duration,
34 heartbeat_staleness: Duration,
35 completed_retention: Duration,
36 failed_retention: Duration,
37}
38
39impl MaintenanceService {
40 pub fn new(
41 pool: PgPool,
42 leader: Arc<AtomicBool>,
43 cancel: CancellationToken,
44 periodic_jobs: Arc<Vec<PeriodicJob>>,
45 in_flight: Arc<RwLock<HashMap<i64, Arc<AtomicBool>>>>,
46 ) -> Self {
47 Self {
48 pool,
49 cancel,
50 leader,
51 periodic_jobs,
52 in_flight,
53 heartbeat_rescue_interval: Duration::from_secs(30),
54 deadline_rescue_interval: Duration::from_secs(30),
55 promote_interval: Duration::from_secs(5),
56 cleanup_interval: Duration::from_secs(60),
57 cron_sync_interval: Duration::from_secs(60),
58 cron_eval_interval: Duration::from_secs(1),
59 leader_check_interval: Duration::from_secs(30),
60 heartbeat_staleness: Duration::from_secs(90),
61 completed_retention: Duration::from_secs(86400), failed_retention: Duration::from_secs(259200), }
64 }
65
66 pub async fn run(&self) {
68 info!("Maintenance service starting");
69 self.leader.store(false, Ordering::SeqCst);
70
71 loop {
72 let mut leader_conn = match self.try_become_leader().await {
75 Ok(Some(conn)) => conn,
76 Ok(None) => {
77 tokio::select! {
79 _ = self.cancel.cancelled() => {
80 debug!("Maintenance service shutting down (not leader)");
81 self.leader.store(false, Ordering::SeqCst);
82 return;
83 }
84 _ = tokio::time::sleep(Duration::from_secs(10)) => continue,
85 }
86 }
87 Err(err) => {
88 warn!(error = %err, "Failed to check leader status");
89 tokio::select! {
90 _ = self.cancel.cancelled() => {
91 debug!("Maintenance service shutting down (leader check failed)");
92 self.leader.store(false, Ordering::SeqCst);
93 return;
94 }
95 _ = tokio::time::sleep(Duration::from_secs(10)) => continue,
96 }
97 }
98 };
99
100 debug!("Elected as maintenance leader");
101 self.leader.store(true, Ordering::SeqCst);
102
103 let mut heartbeat_rescue_timer = tokio::time::interval(self.heartbeat_rescue_interval);
105 let mut deadline_rescue_timer = tokio::time::interval(self.deadline_rescue_interval);
106 let mut promote_timer = tokio::time::interval(self.promote_interval);
107 let mut cleanup_timer = tokio::time::interval(self.cleanup_interval);
108 let mut cron_sync_timer = tokio::time::interval(self.cron_sync_interval);
109 let mut cron_eval_timer = tokio::time::interval(self.cron_eval_interval);
110 let mut leader_check_timer = tokio::time::interval(self.leader_check_interval);
111
112 heartbeat_rescue_timer.tick().await;
114 deadline_rescue_timer.tick().await;
115 promote_timer.tick().await;
116 cleanup_timer.tick().await;
117 cron_sync_timer.tick().await;
118 cron_eval_timer.tick().await;
119 leader_check_timer.tick().await;
120
121 self.sync_periodic_jobs_to_db().await;
123
124 loop {
125 tokio::select! {
126 _ = self.cancel.cancelled() => {
127 debug!("Maintenance service shutting down");
128 self.leader.store(false, Ordering::SeqCst);
129 let _ = Self::release_leader(&mut leader_conn).await;
132 return;
133 }
134 _ = heartbeat_rescue_timer.tick() => {
135 self.rescue_stale_heartbeats().await;
136 }
137 _ = deadline_rescue_timer.tick() => {
138 self.rescue_expired_deadlines().await;
139 }
140 _ = promote_timer.tick() => {
141 self.promote_scheduled().await;
142 }
143 _ = cleanup_timer.tick() => {
144 self.cleanup_completed().await;
145 }
146 _ = cron_sync_timer.tick() => {
147 self.sync_periodic_jobs_to_db().await;
148 }
149 _ = cron_eval_timer.tick() => {
150 self.evaluate_cron_schedules().await;
151 }
152 _ = leader_check_timer.tick() => {
153 if sqlx::query("SELECT 1").execute(&mut *leader_conn).await.is_err() {
157 warn!("Leader connection lost, re-entering election loop");
158 self.leader.store(false, Ordering::SeqCst);
159 break;
160 }
161 }
162 }
163 }
164 }
165 }
166
167 const LOCK_KEY: i64 = 0x_4157_415f_4d41_494e; async fn try_become_leader(&self) -> Result<Option<PoolConnection<Postgres>>, sqlx::Error> {
176 let mut conn = self.pool.acquire().await?;
177 let result: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)")
178 .bind(Self::LOCK_KEY)
179 .fetch_one(&mut *conn)
180 .await?;
181 if result.0 {
182 Ok(Some(conn))
183 } else {
184 Ok(None)
185 }
186 }
187
188 async fn release_leader(conn: &mut PoolConnection<Postgres>) -> Result<(), sqlx::Error> {
193 sqlx::query("SELECT pg_advisory_unlock($1)")
194 .bind(Self::LOCK_KEY)
195 .execute(&mut **conn)
196 .await?;
197 Ok(())
198 }
199
200 #[tracing::instrument(skip(self), name = "maintenance.cron_sync")]
204 async fn sync_periodic_jobs_to_db(&self) {
205 if self.periodic_jobs.is_empty() {
206 return;
207 }
208
209 for job in self.periodic_jobs.iter() {
210 if let Err(err) = upsert_cron_job(&self.pool, job).await {
211 error!(name = %job.name, error = %err, "Failed to sync periodic job");
212 }
213 }
214
215 debug!(
216 count = self.periodic_jobs.len(),
217 "Synced periodic jobs to database"
218 );
219 }
220
221 #[tracing::instrument(skip(self), name = "maintenance.cron_eval")]
227 async fn evaluate_cron_schedules(&self) {
228 let cron_rows = match list_cron_jobs(&self.pool).await {
229 Ok(rows) => rows,
230 Err(err) => {
231 error!(error = %err, "Failed to load cron jobs for evaluation");
232 return;
233 }
234 };
235
236 if cron_rows.is_empty() {
237 return;
238 }
239
240 let now = Utc::now();
241
242 for row in &cron_rows {
243 let fire_time = match compute_fire_time(row, now) {
244 Some(time) => time,
245 None => continue,
246 };
247
248 match atomic_enqueue(&self.pool, &row.name, fire_time, row.last_enqueued_at).await {
249 Ok(Some(job)) => {
250 info!(
251 cron_name = %row.name,
252 job_id = job.id,
253 fire_time = %fire_time,
254 "Enqueued periodic job"
255 );
256 }
257 Ok(None) => {
258 debug!(cron_name = %row.name, "Cron fire already claimed");
260 }
261 Err(err) => {
262 error!(
263 cron_name = %row.name,
264 error = %err,
265 "Failed to enqueue periodic job"
266 );
267 }
268 }
269 }
270 }
271
272 #[tracing::instrument(skip(self), name = "maintenance.rescue_stale")]
274 async fn rescue_stale_heartbeats(&self) {
275 let staleness_str = format!("{} seconds", self.heartbeat_staleness.as_secs());
276 match sqlx::query_as::<_, JobRow>(
277 r#"
278 UPDATE awa.jobs
279 SET state = 'retryable',
280 finalized_at = now(),
281 heartbeat_at = NULL,
282 deadline_at = NULL,
283 errors = errors || jsonb_build_object(
284 'error', 'heartbeat stale: worker presumed dead',
285 'attempt', attempt,
286 'at', now()
287 )::jsonb
288 WHERE id IN (
289 SELECT id FROM awa.jobs
290 WHERE state = 'running'
291 AND heartbeat_at < now() - $1::interval
292 LIMIT 500
293 FOR UPDATE SKIP LOCKED
294 )
295 RETURNING *
296 "#,
297 )
298 .bind(&staleness_str)
299 .fetch_all(&self.pool)
300 .await
301 {
302 Ok(rescued) if !rescued.is_empty() => {
303 warn!(count = rescued.len(), "Rescued stale heartbeat jobs");
304 self.signal_cancellation(&rescued).await;
306 }
307 Err(err) => {
308 error!(error = %err, "Failed to rescue stale heartbeat jobs");
309 }
310 _ => {}
311 }
312 }
313
314 #[tracing::instrument(skip(self), name = "maintenance.rescue_deadline")]
316 async fn rescue_expired_deadlines(&self) {
317 match sqlx::query_as::<_, JobRow>(
318 r#"
319 UPDATE awa.jobs
320 SET state = 'retryable',
321 finalized_at = now(),
322 heartbeat_at = NULL,
323 deadline_at = NULL,
324 errors = errors || jsonb_build_object(
325 'error', 'hard deadline exceeded',
326 'attempt', attempt,
327 'at', now()
328 )::jsonb
329 WHERE id IN (
330 SELECT id FROM awa.jobs
331 WHERE state = 'running'
332 AND deadline_at IS NOT NULL
333 AND deadline_at < now()
334 LIMIT 500
335 FOR UPDATE SKIP LOCKED
336 )
337 RETURNING *
338 "#,
339 )
340 .fetch_all(&self.pool)
341 .await
342 {
343 Ok(rescued) if !rescued.is_empty() => {
344 warn!(count = rescued.len(), "Rescued deadline-expired jobs");
345 self.signal_cancellation(&rescued).await;
347 }
348 Err(err) => {
349 error!(error = %err, "Failed to rescue deadline-expired jobs");
350 }
351 _ => {}
352 }
353 }
354
355 async fn signal_cancellation(&self, rescued_jobs: &[JobRow]) {
357 let guard = self.in_flight.read().await;
358 for job in rescued_jobs {
359 if let Some(flag) = guard.get(&job.id) {
360 flag.store(true, Ordering::SeqCst);
361 debug!(job_id = job.id, "Signalled cancellation for rescued job");
362 }
363 }
364 }
365
366 #[tracing::instrument(skip(self), name = "maintenance.promote")]
368 async fn promote_scheduled(&self) {
369 match sqlx::query(
370 "UPDATE awa.jobs SET state = 'available' WHERE state = 'scheduled' AND run_at <= now()",
371 )
372 .execute(&self.pool)
373 .await
374 {
375 Ok(result) if result.rows_affected() > 0 => {
376 debug!(count = result.rows_affected(), "Promoted scheduled jobs");
377 }
378 Err(err) => {
379 error!(error = %err, "Failed to promote scheduled jobs");
380 }
381 _ => {}
382 }
383
384 match sqlx::query(
386 "UPDATE awa.jobs SET state = 'available' WHERE state = 'retryable' AND run_at <= now()",
387 )
388 .execute(&self.pool)
389 .await
390 {
391 Ok(result) if result.rows_affected() > 0 => {
392 debug!(
393 count = result.rows_affected(),
394 "Promoted retryable jobs (backoff elapsed)"
395 );
396 }
397 Err(err) => {
398 error!(error = %err, "Failed to promote retryable jobs");
399 }
400 _ => {}
401 }
402 }
403
404 #[tracing::instrument(skip(self), name = "maintenance.cleanup")]
406 async fn cleanup_completed(&self) {
407 let completed_retention = format!("{} seconds", self.completed_retention.as_secs());
408 let failed_retention = format!("{} seconds", self.failed_retention.as_secs());
409
410 match sqlx::query(
411 r#"
412 DELETE FROM awa.jobs
413 WHERE id IN (
414 SELECT id FROM awa.jobs
415 WHERE (state = 'completed' AND finalized_at < now() - $1::interval)
416 OR (state IN ('failed', 'cancelled') AND finalized_at < now() - $2::interval)
417 LIMIT 1000
418 )
419 "#,
420 )
421 .bind(&completed_retention)
422 .bind(&failed_retention)
423 .execute(&self.pool)
424 .await
425 {
426 Ok(result) if result.rows_affected() > 0 => {
427 info!(count = result.rows_affected(), "Cleaned up old jobs");
428 }
429 Err(err) => {
430 error!(error = %err, "Failed to clean up old jobs");
431 }
432 _ => {}
433 }
434 }
435}
436
437fn compute_fire_time(
441 row: &CronJobRow,
442 now: chrono::DateTime<Utc>,
443) -> Option<chrono::DateTime<Utc>> {
444 let cron = match Cron::new(&row.cron_expr).parse() {
445 Ok(c) => c,
446 Err(err) => {
447 error!(cron_name = %row.name, error = %err, "Invalid cron expression in database");
448 return None;
449 }
450 };
451
452 let tz: chrono_tz::Tz = match row.timezone.parse() {
453 Ok(tz) => tz,
454 Err(err) => {
455 error!(cron_name = %row.name, error = %err, "Invalid timezone in database");
456 return None;
457 }
458 };
459
460 let search_start = match row.last_enqueued_at {
461 Some(last) => last.with_timezone(&tz),
462 None => row.created_at.with_timezone(&tz),
466 };
467
468 let mut latest_fire: Option<chrono::DateTime<Utc>> = None;
469
470 for fire_time in cron.iter_from(search_start) {
471 let fire_utc = fire_time.with_timezone(&Utc);
472
473 if fire_utc > now {
474 break;
475 }
476
477 if let Some(last) = row.last_enqueued_at {
478 if fire_utc <= last {
479 continue;
480 }
481 }
482
483 latest_fire = Some(fire_utc);
484 }
485
486 latest_fire
487}