Skip to main content

awa_worker/
maintenance.rs

1use awa_model::cron::{atomic_enqueue, list_cron_jobs, upsert_cron_job, CronJobRow};
2use awa_model::{JobRow, PeriodicJob};
3use chrono::Utc;
4use croner::Cron;
5use sqlx::pool::PoolConnection;
6use sqlx::{PgPool, Postgres};
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::RwLock;
12use tokio_util::sync::CancellationToken;
13use tracing::{debug, error, info, warn};
14
15/// Maintenance service: runs leader-elected background tasks.
16///
17/// Tasks: heartbeat rescue, deadline rescue, scheduled promotion, cleanup,
18/// periodic job sync and evaluation.
19pub struct MaintenanceService {
20    pool: PgPool,
21    cancel: CancellationToken,
22    leader: Arc<AtomicBool>,
23    periodic_jobs: Arc<Vec<PeriodicJob>>,
24    /// In-flight job cancellation flags — used to signal deadline/heartbeat rescue
25    /// to running handlers on this worker instance.
26    in_flight: Arc<RwLock<HashMap<i64, Arc<AtomicBool>>>>,
27    heartbeat_rescue_interval: Duration,
28    deadline_rescue_interval: Duration,
29    promote_interval: Duration,
30    cleanup_interval: Duration,
31    cron_sync_interval: Duration,
32    cron_eval_interval: Duration,
33    leader_check_interval: Duration,
34    heartbeat_staleness: Duration,
35    completed_retention: Duration,
36    failed_retention: Duration,
37}
38
39impl MaintenanceService {
40    pub fn new(
41        pool: PgPool,
42        leader: Arc<AtomicBool>,
43        cancel: CancellationToken,
44        periodic_jobs: Arc<Vec<PeriodicJob>>,
45        in_flight: Arc<RwLock<HashMap<i64, Arc<AtomicBool>>>>,
46    ) -> Self {
47        Self {
48            pool,
49            cancel,
50            leader,
51            periodic_jobs,
52            in_flight,
53            heartbeat_rescue_interval: Duration::from_secs(30),
54            deadline_rescue_interval: Duration::from_secs(30),
55            promote_interval: Duration::from_secs(5),
56            cleanup_interval: Duration::from_secs(60),
57            cron_sync_interval: Duration::from_secs(60),
58            cron_eval_interval: Duration::from_secs(1),
59            leader_check_interval: Duration::from_secs(30),
60            heartbeat_staleness: Duration::from_secs(90),
61            completed_retention: Duration::from_secs(86400), // 24h
62            failed_retention: Duration::from_secs(259200),   // 72h
63        }
64    }
65
66    /// Run the maintenance loop. Attempts leader election first.
67    pub async fn run(&self) {
68        info!("Maintenance service starting");
69        self.leader.store(false, Ordering::SeqCst);
70
71        loop {
72            // Try to acquire advisory lock for leader election.
73            // We get back a dedicated connection that holds the lock.
74            let mut leader_conn = match self.try_become_leader().await {
75                Ok(Some(conn)) => conn,
76                Ok(None) => {
77                    // Not leader — back off and try again
78                    tokio::select! {
79                        _ = self.cancel.cancelled() => {
80                            debug!("Maintenance service shutting down (not leader)");
81                            self.leader.store(false, Ordering::SeqCst);
82                            return;
83                        }
84                        _ = tokio::time::sleep(Duration::from_secs(10)) => continue,
85                    }
86                }
87                Err(err) => {
88                    warn!(error = %err, "Failed to check leader status");
89                    tokio::select! {
90                        _ = self.cancel.cancelled() => {
91                            debug!("Maintenance service shutting down (leader check failed)");
92                            self.leader.store(false, Ordering::SeqCst);
93                            return;
94                        }
95                        _ = tokio::time::sleep(Duration::from_secs(10)) => continue,
96                    }
97                }
98            };
99
100            debug!("Elected as maintenance leader");
101            self.leader.store(true, Ordering::SeqCst);
102
103            // Run maintenance tasks as leader
104            let mut heartbeat_rescue_timer = tokio::time::interval(self.heartbeat_rescue_interval);
105            let mut deadline_rescue_timer = tokio::time::interval(self.deadline_rescue_interval);
106            let mut promote_timer = tokio::time::interval(self.promote_interval);
107            let mut cleanup_timer = tokio::time::interval(self.cleanup_interval);
108            let mut cron_sync_timer = tokio::time::interval(self.cron_sync_interval);
109            let mut cron_eval_timer = tokio::time::interval(self.cron_eval_interval);
110            let mut leader_check_timer = tokio::time::interval(self.leader_check_interval);
111
112            // Skip the first immediate tick
113            heartbeat_rescue_timer.tick().await;
114            deadline_rescue_timer.tick().await;
115            promote_timer.tick().await;
116            cleanup_timer.tick().await;
117            cron_sync_timer.tick().await;
118            cron_eval_timer.tick().await;
119            leader_check_timer.tick().await;
120
121            // Do an initial sync immediately on becoming leader
122            self.sync_periodic_jobs_to_db().await;
123
124            loop {
125                tokio::select! {
126                    _ = self.cancel.cancelled() => {
127                        debug!("Maintenance service shutting down");
128                        self.leader.store(false, Ordering::SeqCst);
129                        // Release leader lock on the same connection that acquired it.
130                        // If this fails, dropping the connection will release the lock anyway.
131                        let _ = Self::release_leader(&mut leader_conn).await;
132                        return;
133                    }
134                    _ = heartbeat_rescue_timer.tick() => {
135                        self.rescue_stale_heartbeats().await;
136                    }
137                    _ = deadline_rescue_timer.tick() => {
138                        self.rescue_expired_deadlines().await;
139                    }
140                    _ = promote_timer.tick() => {
141                        self.promote_scheduled().await;
142                    }
143                    _ = cleanup_timer.tick() => {
144                        self.cleanup_completed().await;
145                    }
146                    _ = cron_sync_timer.tick() => {
147                        self.sync_periodic_jobs_to_db().await;
148                    }
149                    _ = cron_eval_timer.tick() => {
150                        self.evaluate_cron_schedules().await;
151                    }
152                    _ = leader_check_timer.tick() => {
153                        // Verify leader connection is still alive.
154                        // The advisory lock is session-scoped: if the connection is alive,
155                        // the lock is held. If the query fails, the connection (and lock) are gone.
156                        if sqlx::query("SELECT 1").execute(&mut *leader_conn).await.is_err() {
157                            warn!("Leader connection lost, re-entering election loop");
158                            self.leader.store(false, Ordering::SeqCst);
159                            break;
160                        }
161                    }
162                }
163            }
164        }
165    }
166
167    /// Advisory lock key for Awa maintenance leader election.
168    const LOCK_KEY: i64 = 0x_4157_415f_4d41_494e; // "AWA_MAIN" in hex-ish
169
170    /// Try to acquire the advisory lock for leader election.
171    ///
172    /// Returns a dedicated connection holding the lock on success, or `None` if
173    /// another instance already holds the lock. The lock is session-scoped in
174    /// PostgreSQL, so it stays held as long as this connection is alive.
175    async fn try_become_leader(&self) -> Result<Option<PoolConnection<Postgres>>, sqlx::Error> {
176        let mut conn = self.pool.acquire().await?;
177        let result: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)")
178            .bind(Self::LOCK_KEY)
179            .fetch_one(&mut *conn)
180            .await?;
181        if result.0 {
182            Ok(Some(conn))
183        } else {
184            Ok(None)
185        }
186    }
187
188    /// Release the advisory lock on the same connection that acquired it.
189    ///
190    /// Dropping the connection also releases the lock (PG session-scoped behavior),
191    /// so this is a best-effort explicit release.
192    async fn release_leader(conn: &mut PoolConnection<Postgres>) -> Result<(), sqlx::Error> {
193        sqlx::query("SELECT pg_advisory_unlock($1)")
194            .bind(Self::LOCK_KEY)
195            .execute(&mut **conn)
196            .await?;
197        Ok(())
198    }
199
200    /// Sync all registered periodic job schedules to `awa.cron_jobs` via UPSERT.
201    ///
202    /// Additive only — does NOT delete schedules not in the local set (multi-deployment safe).
203    #[tracing::instrument(skip(self), name = "maintenance.cron_sync")]
204    async fn sync_periodic_jobs_to_db(&self) {
205        if self.periodic_jobs.is_empty() {
206            return;
207        }
208
209        for job in self.periodic_jobs.iter() {
210            if let Err(err) = upsert_cron_job(&self.pool, job).await {
211                error!(name = %job.name, error = %err, "Failed to sync periodic job");
212            }
213        }
214
215        debug!(
216            count = self.periodic_jobs.len(),
217            "Synced periodic jobs to database"
218        );
219    }
220
221    /// Evaluate all cron schedules and enqueue any that are due.
222    ///
223    /// For each schedule, computes the latest fire time ≤ now that is after
224    /// `last_enqueued_at`. If a fire is due, executes the atomic CTE to
225    /// mark + insert in one statement.
226    #[tracing::instrument(skip(self), name = "maintenance.cron_eval")]
227    async fn evaluate_cron_schedules(&self) {
228        let cron_rows = match list_cron_jobs(&self.pool).await {
229            Ok(rows) => rows,
230            Err(err) => {
231                error!(error = %err, "Failed to load cron jobs for evaluation");
232                return;
233            }
234        };
235
236        if cron_rows.is_empty() {
237            return;
238        }
239
240        let now = Utc::now();
241
242        for row in &cron_rows {
243            let fire_time = match compute_fire_time(row, now) {
244                Some(time) => time,
245                None => continue,
246            };
247
248            match atomic_enqueue(&self.pool, &row.name, fire_time, row.last_enqueued_at).await {
249                Ok(Some(job)) => {
250                    info!(
251                        cron_name = %row.name,
252                        job_id = job.id,
253                        fire_time = %fire_time,
254                        "Enqueued periodic job"
255                    );
256                }
257                Ok(None) => {
258                    // Another leader already claimed this fire — not an error
259                    debug!(cron_name = %row.name, "Cron fire already claimed");
260                }
261                Err(err) => {
262                    error!(
263                        cron_name = %row.name,
264                        error = %err,
265                        "Failed to enqueue periodic job"
266                    );
267                }
268            }
269        }
270    }
271
272    /// Rescue jobs with stale heartbeats (crash detection).
273    #[tracing::instrument(skip(self), name = "maintenance.rescue_stale")]
274    async fn rescue_stale_heartbeats(&self) {
275        let staleness_str = format!("{} seconds", self.heartbeat_staleness.as_secs());
276        match sqlx::query_as::<_, JobRow>(
277            r#"
278            UPDATE awa.jobs
279            SET state = 'retryable',
280                finalized_at = now(),
281                heartbeat_at = NULL,
282                deadline_at = NULL,
283                errors = errors || jsonb_build_object(
284                    'error', 'heartbeat stale: worker presumed dead',
285                    'attempt', attempt,
286                    'at', now()
287                )::jsonb
288            WHERE id IN (
289                SELECT id FROM awa.jobs
290                WHERE state = 'running'
291                  AND heartbeat_at < now() - $1::interval
292                LIMIT 500
293                FOR UPDATE SKIP LOCKED
294            )
295            RETURNING *
296            "#,
297        )
298        .bind(&staleness_str)
299        .fetch_all(&self.pool)
300        .await
301        {
302            Ok(rescued) if !rescued.is_empty() => {
303                warn!(count = rescued.len(), "Rescued stale heartbeat jobs");
304                // Signal cancellation to any rescued jobs still running on this instance
305                self.signal_cancellation(&rescued).await;
306            }
307            Err(err) => {
308                error!(error = %err, "Failed to rescue stale heartbeat jobs");
309            }
310            _ => {}
311        }
312    }
313
314    /// Rescue jobs that exceeded their hard deadline.
315    #[tracing::instrument(skip(self), name = "maintenance.rescue_deadline")]
316    async fn rescue_expired_deadlines(&self) {
317        match sqlx::query_as::<_, JobRow>(
318            r#"
319            UPDATE awa.jobs
320            SET state = 'retryable',
321                finalized_at = now(),
322                heartbeat_at = NULL,
323                deadline_at = NULL,
324                errors = errors || jsonb_build_object(
325                    'error', 'hard deadline exceeded',
326                    'attempt', attempt,
327                    'at', now()
328                )::jsonb
329            WHERE id IN (
330                SELECT id FROM awa.jobs
331                WHERE state = 'running'
332                  AND deadline_at IS NOT NULL
333                  AND deadline_at < now()
334                LIMIT 500
335                FOR UPDATE SKIP LOCKED
336            )
337            RETURNING *
338            "#,
339        )
340        .fetch_all(&self.pool)
341        .await
342        {
343            Ok(rescued) if !rescued.is_empty() => {
344                warn!(count = rescued.len(), "Rescued deadline-expired jobs");
345                // Signal cancellation so handlers see ctx.is_cancelled() == true
346                self.signal_cancellation(&rescued).await;
347            }
348            Err(err) => {
349                error!(error = %err, "Failed to rescue deadline-expired jobs");
350            }
351            _ => {}
352        }
353    }
354
355    /// Signal cancellation to any rescued jobs that are still running on this instance.
356    async fn signal_cancellation(&self, rescued_jobs: &[JobRow]) {
357        let guard = self.in_flight.read().await;
358        for job in rescued_jobs {
359            if let Some(flag) = guard.get(&job.id) {
360                flag.store(true, Ordering::SeqCst);
361                debug!(job_id = job.id, "Signalled cancellation for rescued job");
362            }
363        }
364    }
365
366    /// Promote scheduled jobs that are now due.
367    #[tracing::instrument(skip(self), name = "maintenance.promote")]
368    async fn promote_scheduled(&self) {
369        match sqlx::query(
370            "UPDATE awa.jobs SET state = 'available' WHERE state = 'scheduled' AND run_at <= now()",
371        )
372        .execute(&self.pool)
373        .await
374        {
375            Ok(result) if result.rows_affected() > 0 => {
376                debug!(count = result.rows_affected(), "Promoted scheduled jobs");
377            }
378            Err(err) => {
379                error!(error = %err, "Failed to promote scheduled jobs");
380            }
381            _ => {}
382        }
383
384        // Also promote retryable jobs whose backoff has elapsed
385        match sqlx::query(
386            "UPDATE awa.jobs SET state = 'available' WHERE state = 'retryable' AND run_at <= now()",
387        )
388        .execute(&self.pool)
389        .await
390        {
391            Ok(result) if result.rows_affected() > 0 => {
392                debug!(
393                    count = result.rows_affected(),
394                    "Promoted retryable jobs (backoff elapsed)"
395                );
396            }
397            Err(err) => {
398                error!(error = %err, "Failed to promote retryable jobs");
399            }
400            _ => {}
401        }
402    }
403
404    /// Clean up completed/failed/cancelled jobs past retention.
405    #[tracing::instrument(skip(self), name = "maintenance.cleanup")]
406    async fn cleanup_completed(&self) {
407        let completed_retention = format!("{} seconds", self.completed_retention.as_secs());
408        let failed_retention = format!("{} seconds", self.failed_retention.as_secs());
409
410        match sqlx::query(
411            r#"
412            DELETE FROM awa.jobs
413            WHERE id IN (
414                SELECT id FROM awa.jobs
415                WHERE (state = 'completed' AND finalized_at < now() - $1::interval)
416                   OR (state IN ('failed', 'cancelled') AND finalized_at < now() - $2::interval)
417                LIMIT 1000
418            )
419            "#,
420        )
421        .bind(&completed_retention)
422        .bind(&failed_retention)
423        .execute(&self.pool)
424        .await
425        {
426            Ok(result) if result.rows_affected() > 0 => {
427                info!(count = result.rows_affected(), "Cleaned up old jobs");
428            }
429            Err(err) => {
430                error!(error = %err, "Failed to clean up old jobs");
431            }
432            _ => {}
433        }
434    }
435}
436
437/// Compute the latest fire time for a cron job row, using its expression and timezone.
438///
439/// Returns `None` if no fire is due (next occurrence is in the future).
440fn compute_fire_time(
441    row: &CronJobRow,
442    now: chrono::DateTime<Utc>,
443) -> Option<chrono::DateTime<Utc>> {
444    let cron = match Cron::new(&row.cron_expr).parse() {
445        Ok(c) => c,
446        Err(err) => {
447            error!(cron_name = %row.name, error = %err, "Invalid cron expression in database");
448            return None;
449        }
450    };
451
452    let tz: chrono_tz::Tz = match row.timezone.parse() {
453        Ok(tz) => tz,
454        Err(err) => {
455            error!(cron_name = %row.name, error = %err, "Invalid timezone in database");
456            return None;
457        }
458    };
459
460    let search_start = match row.last_enqueued_at {
461        Some(last) => last.with_timezone(&tz),
462        // First registration: search from cron_jobs.created_at so that
463        // low-frequency schedules (weekly, monthly) still find their most
464        // recent past fire. Previously capped at 24h which missed them.
465        None => row.created_at.with_timezone(&tz),
466    };
467
468    let mut latest_fire: Option<chrono::DateTime<Utc>> = None;
469
470    for fire_time in cron.iter_from(search_start) {
471        let fire_utc = fire_time.with_timezone(&Utc);
472
473        if fire_utc > now {
474            break;
475        }
476
477        if let Some(last) = row.last_enqueued_at {
478            if fire_utc <= last {
479                continue;
480            }
481        }
482
483        latest_fire = Some(fire_utc);
484    }
485
486    latest_fire
487}