Skip to main content

awa_worker/
maintenance.rs

1use awa_model::JobRow;
2use sqlx::pool::PoolConnection;
3use sqlx::{PgPool, Postgres};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio_util::sync::CancellationToken;
8use tracing::{debug, error, info, warn};
9
10/// Maintenance service: runs leader-elected background tasks.
11///
12/// Tasks: heartbeat rescue, deadline rescue, scheduled promotion, cleanup.
13pub struct MaintenanceService {
14    pool: PgPool,
15    cancel: CancellationToken,
16    leader: Arc<AtomicBool>,
17    heartbeat_rescue_interval: Duration,
18    deadline_rescue_interval: Duration,
19    promote_interval: Duration,
20    cleanup_interval: Duration,
21    heartbeat_staleness: Duration,
22    completed_retention: Duration,
23    failed_retention: Duration,
24}
25
26impl MaintenanceService {
27    pub fn new(pool: PgPool, leader: Arc<AtomicBool>, cancel: CancellationToken) -> Self {
28        Self {
29            pool,
30            cancel,
31            leader,
32            heartbeat_rescue_interval: Duration::from_secs(30),
33            deadline_rescue_interval: Duration::from_secs(30),
34            promote_interval: Duration::from_secs(5),
35            cleanup_interval: Duration::from_secs(60),
36            heartbeat_staleness: Duration::from_secs(90),
37            completed_retention: Duration::from_secs(86400), // 24h
38            failed_retention: Duration::from_secs(259200),   // 72h
39        }
40    }
41
42    /// Run the maintenance loop. Attempts leader election first.
43    pub async fn run(&self) {
44        info!("Maintenance service starting");
45        self.leader.store(false, Ordering::SeqCst);
46
47        loop {
48            // Try to acquire advisory lock for leader election.
49            // We get back a dedicated connection that holds the lock.
50            let mut leader_conn = match self.try_become_leader().await {
51                Ok(Some(conn)) => conn,
52                Ok(None) => {
53                    // Not leader — back off and try again
54                    tokio::select! {
55                        _ = self.cancel.cancelled() => {
56                            debug!("Maintenance service shutting down (not leader)");
57                            self.leader.store(false, Ordering::SeqCst);
58                            return;
59                        }
60                        _ = tokio::time::sleep(Duration::from_secs(10)) => continue,
61                    }
62                }
63                Err(err) => {
64                    warn!(error = %err, "Failed to check leader status");
65                    tokio::select! {
66                        _ = self.cancel.cancelled() => {
67                            debug!("Maintenance service shutting down (leader check failed)");
68                            self.leader.store(false, Ordering::SeqCst);
69                            return;
70                        }
71                        _ = tokio::time::sleep(Duration::from_secs(10)) => continue,
72                    }
73                }
74            };
75
76            debug!("Elected as maintenance leader");
77            self.leader.store(true, Ordering::SeqCst);
78
79            // Run maintenance tasks as leader
80            let mut heartbeat_rescue_timer = tokio::time::interval(self.heartbeat_rescue_interval);
81            let mut deadline_rescue_timer = tokio::time::interval(self.deadline_rescue_interval);
82            let mut promote_timer = tokio::time::interval(self.promote_interval);
83            let mut cleanup_timer = tokio::time::interval(self.cleanup_interval);
84
85            // Skip the first immediate tick
86            heartbeat_rescue_timer.tick().await;
87            deadline_rescue_timer.tick().await;
88            promote_timer.tick().await;
89            cleanup_timer.tick().await;
90
91            loop {
92                tokio::select! {
93                    _ = self.cancel.cancelled() => {
94                        debug!("Maintenance service shutting down");
95                        self.leader.store(false, Ordering::SeqCst);
96                        // Release leader lock on the same connection that acquired it.
97                        // If this fails, dropping the connection will release the lock anyway.
98                        let _ = Self::release_leader(&mut leader_conn).await;
99                        return;
100                    }
101                    _ = heartbeat_rescue_timer.tick() => {
102                        self.rescue_stale_heartbeats().await;
103                    }
104                    _ = deadline_rescue_timer.tick() => {
105                        self.rescue_expired_deadlines().await;
106                    }
107                    _ = promote_timer.tick() => {
108                        self.promote_scheduled().await;
109                    }
110                    _ = cleanup_timer.tick() => {
111                        self.cleanup_completed().await;
112                    }
113                }
114            }
115        }
116    }
117
118    /// Advisory lock key for Awa maintenance leader election.
119    const LOCK_KEY: i64 = 0x_4157_415f_4d41_494e; // "AWA_MAIN" in hex-ish
120
121    /// Try to acquire the advisory lock for leader election.
122    ///
123    /// Returns a dedicated connection holding the lock on success, or `None` if
124    /// another instance already holds the lock. The lock is session-scoped in
125    /// PostgreSQL, so it stays held as long as this connection is alive.
126    async fn try_become_leader(&self) -> Result<Option<PoolConnection<Postgres>>, sqlx::Error> {
127        let mut conn = self.pool.acquire().await?;
128        let result: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)")
129            .bind(Self::LOCK_KEY)
130            .fetch_one(&mut *conn)
131            .await?;
132        if result.0 {
133            Ok(Some(conn))
134        } else {
135            Ok(None)
136        }
137    }
138
139    /// Release the advisory lock on the same connection that acquired it.
140    ///
141    /// Dropping the connection also releases the lock (PG session-scoped behavior),
142    /// so this is a best-effort explicit release.
143    async fn release_leader(conn: &mut PoolConnection<Postgres>) -> Result<(), sqlx::Error> {
144        sqlx::query("SELECT pg_advisory_unlock($1)")
145            .bind(Self::LOCK_KEY)
146            .execute(&mut **conn)
147            .await?;
148        Ok(())
149    }
150
151    /// Rescue jobs with stale heartbeats (crash detection).
152    #[tracing::instrument(skip(self), name = "maintenance.rescue_stale")]
153    async fn rescue_stale_heartbeats(&self) {
154        let staleness_str = format!("{} seconds", self.heartbeat_staleness.as_secs());
155        match sqlx::query_as::<_, JobRow>(
156            r#"
157            UPDATE awa.jobs
158            SET state = 'retryable',
159                finalized_at = now(),
160                heartbeat_at = NULL,
161                deadline_at = NULL,
162                errors = errors || jsonb_build_object(
163                    'error', 'heartbeat stale: worker presumed dead',
164                    'attempt', attempt,
165                    'at', now()
166                )::jsonb
167            WHERE id IN (
168                SELECT id FROM awa.jobs
169                WHERE state = 'running'
170                  AND heartbeat_at < now() - $1::interval
171                LIMIT 500
172                FOR UPDATE SKIP LOCKED
173            )
174            RETURNING *
175            "#,
176        )
177        .bind(&staleness_str)
178        .fetch_all(&self.pool)
179        .await
180        {
181            Ok(rescued) if !rescued.is_empty() => {
182                warn!(count = rescued.len(), "Rescued stale heartbeat jobs");
183            }
184            Err(err) => {
185                error!(error = %err, "Failed to rescue stale heartbeat jobs");
186            }
187            _ => {}
188        }
189    }
190
191    /// Rescue jobs that exceeded their hard deadline.
192    #[tracing::instrument(skip(self), name = "maintenance.rescue_deadline")]
193    async fn rescue_expired_deadlines(&self) {
194        match sqlx::query_as::<_, JobRow>(
195            r#"
196            UPDATE awa.jobs
197            SET state = 'retryable',
198                finalized_at = now(),
199                heartbeat_at = NULL,
200                deadline_at = NULL,
201                errors = errors || jsonb_build_object(
202                    'error', 'hard deadline exceeded',
203                    'attempt', attempt,
204                    'at', now()
205                )::jsonb
206            WHERE id IN (
207                SELECT id FROM awa.jobs
208                WHERE state = 'running'
209                  AND deadline_at IS NOT NULL
210                  AND deadline_at < now()
211                LIMIT 500
212                FOR UPDATE SKIP LOCKED
213            )
214            RETURNING *
215            "#,
216        )
217        .fetch_all(&self.pool)
218        .await
219        {
220            Ok(rescued) if !rescued.is_empty() => {
221                warn!(count = rescued.len(), "Rescued deadline-expired jobs");
222            }
223            Err(err) => {
224                error!(error = %err, "Failed to rescue deadline-expired jobs");
225            }
226            _ => {}
227        }
228    }
229
230    /// Promote scheduled jobs that are now due.
231    #[tracing::instrument(skip(self), name = "maintenance.promote")]
232    async fn promote_scheduled(&self) {
233        match sqlx::query(
234            "UPDATE awa.jobs SET state = 'available' WHERE state = 'scheduled' AND run_at <= now()",
235        )
236        .execute(&self.pool)
237        .await
238        {
239            Ok(result) if result.rows_affected() > 0 => {
240                debug!(count = result.rows_affected(), "Promoted scheduled jobs");
241            }
242            Err(err) => {
243                error!(error = %err, "Failed to promote scheduled jobs");
244            }
245            _ => {}
246        }
247
248        // Also promote retryable jobs whose backoff has elapsed
249        match sqlx::query(
250            "UPDATE awa.jobs SET state = 'available' WHERE state = 'retryable' AND run_at <= now()",
251        )
252        .execute(&self.pool)
253        .await
254        {
255            Ok(result) if result.rows_affected() > 0 => {
256                debug!(
257                    count = result.rows_affected(),
258                    "Promoted retryable jobs (backoff elapsed)"
259                );
260            }
261            Err(err) => {
262                error!(error = %err, "Failed to promote retryable jobs");
263            }
264            _ => {}
265        }
266    }
267
268    /// Clean up completed/failed/cancelled jobs past retention.
269    #[tracing::instrument(skip(self), name = "maintenance.cleanup")]
270    async fn cleanup_completed(&self) {
271        let completed_retention = format!("{} seconds", self.completed_retention.as_secs());
272        let failed_retention = format!("{} seconds", self.failed_retention.as_secs());
273
274        match sqlx::query(
275            r#"
276            DELETE FROM awa.jobs
277            WHERE id IN (
278                SELECT id FROM awa.jobs
279                WHERE (state = 'completed' AND finalized_at < now() - $1::interval)
280                   OR (state IN ('failed', 'cancelled') AND finalized_at < now() - $2::interval)
281                LIMIT 1000
282            )
283            "#,
284        )
285        .bind(&completed_retention)
286        .bind(&failed_retention)
287        .execute(&self.pool)
288        .await
289        {
290            Ok(result) if result.rows_affected() > 0 => {
291                info!(count = result.rows_affected(), "Cleaned up old jobs");
292            }
293            Err(err) => {
294                error!(error = %err, "Failed to clean up old jobs");
295            }
296            _ => {}
297        }
298    }
299}