awa_worker/
maintenance.rs1use awa_model::JobRow;
2use sqlx::pool::PoolConnection;
3use sqlx::{PgPool, Postgres};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio_util::sync::CancellationToken;
8use tracing::{debug, error, info, warn};
9
10pub struct MaintenanceService {
14 pool: PgPool,
15 cancel: CancellationToken,
16 leader: Arc<AtomicBool>,
17 heartbeat_rescue_interval: Duration,
18 deadline_rescue_interval: Duration,
19 promote_interval: Duration,
20 cleanup_interval: Duration,
21 heartbeat_staleness: Duration,
22 completed_retention: Duration,
23 failed_retention: Duration,
24}
25
26impl MaintenanceService {
27 pub fn new(pool: PgPool, leader: Arc<AtomicBool>, cancel: CancellationToken) -> Self {
28 Self {
29 pool,
30 cancel,
31 leader,
32 heartbeat_rescue_interval: Duration::from_secs(30),
33 deadline_rescue_interval: Duration::from_secs(30),
34 promote_interval: Duration::from_secs(5),
35 cleanup_interval: Duration::from_secs(60),
36 heartbeat_staleness: Duration::from_secs(90),
37 completed_retention: Duration::from_secs(86400), failed_retention: Duration::from_secs(259200), }
40 }
41
42 pub async fn run(&self) {
44 info!("Maintenance service starting");
45 self.leader.store(false, Ordering::SeqCst);
46
47 loop {
48 let mut leader_conn = match self.try_become_leader().await {
51 Ok(Some(conn)) => conn,
52 Ok(None) => {
53 tokio::select! {
55 _ = self.cancel.cancelled() => {
56 debug!("Maintenance service shutting down (not leader)");
57 self.leader.store(false, Ordering::SeqCst);
58 return;
59 }
60 _ = tokio::time::sleep(Duration::from_secs(10)) => continue,
61 }
62 }
63 Err(err) => {
64 warn!(error = %err, "Failed to check leader status");
65 tokio::select! {
66 _ = self.cancel.cancelled() => {
67 debug!("Maintenance service shutting down (leader check failed)");
68 self.leader.store(false, Ordering::SeqCst);
69 return;
70 }
71 _ = tokio::time::sleep(Duration::from_secs(10)) => continue,
72 }
73 }
74 };
75
76 debug!("Elected as maintenance leader");
77 self.leader.store(true, Ordering::SeqCst);
78
79 let mut heartbeat_rescue_timer = tokio::time::interval(self.heartbeat_rescue_interval);
81 let mut deadline_rescue_timer = tokio::time::interval(self.deadline_rescue_interval);
82 let mut promote_timer = tokio::time::interval(self.promote_interval);
83 let mut cleanup_timer = tokio::time::interval(self.cleanup_interval);
84
85 heartbeat_rescue_timer.tick().await;
87 deadline_rescue_timer.tick().await;
88 promote_timer.tick().await;
89 cleanup_timer.tick().await;
90
91 loop {
92 tokio::select! {
93 _ = self.cancel.cancelled() => {
94 debug!("Maintenance service shutting down");
95 self.leader.store(false, Ordering::SeqCst);
96 let _ = Self::release_leader(&mut leader_conn).await;
99 return;
100 }
101 _ = heartbeat_rescue_timer.tick() => {
102 self.rescue_stale_heartbeats().await;
103 }
104 _ = deadline_rescue_timer.tick() => {
105 self.rescue_expired_deadlines().await;
106 }
107 _ = promote_timer.tick() => {
108 self.promote_scheduled().await;
109 }
110 _ = cleanup_timer.tick() => {
111 self.cleanup_completed().await;
112 }
113 }
114 }
115 }
116 }
117
118 const LOCK_KEY: i64 = 0x_4157_415f_4d41_494e; async fn try_become_leader(&self) -> Result<Option<PoolConnection<Postgres>>, sqlx::Error> {
127 let mut conn = self.pool.acquire().await?;
128 let result: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)")
129 .bind(Self::LOCK_KEY)
130 .fetch_one(&mut *conn)
131 .await?;
132 if result.0 {
133 Ok(Some(conn))
134 } else {
135 Ok(None)
136 }
137 }
138
139 async fn release_leader(conn: &mut PoolConnection<Postgres>) -> Result<(), sqlx::Error> {
144 sqlx::query("SELECT pg_advisory_unlock($1)")
145 .bind(Self::LOCK_KEY)
146 .execute(&mut **conn)
147 .await?;
148 Ok(())
149 }
150
151 #[tracing::instrument(skip(self), name = "maintenance.rescue_stale")]
153 async fn rescue_stale_heartbeats(&self) {
154 let staleness_str = format!("{} seconds", self.heartbeat_staleness.as_secs());
155 match sqlx::query_as::<_, JobRow>(
156 r#"
157 UPDATE awa.jobs
158 SET state = 'retryable',
159 finalized_at = now(),
160 heartbeat_at = NULL,
161 deadline_at = NULL,
162 errors = errors || jsonb_build_object(
163 'error', 'heartbeat stale: worker presumed dead',
164 'attempt', attempt,
165 'at', now()
166 )::jsonb
167 WHERE id IN (
168 SELECT id FROM awa.jobs
169 WHERE state = 'running'
170 AND heartbeat_at < now() - $1::interval
171 LIMIT 500
172 FOR UPDATE SKIP LOCKED
173 )
174 RETURNING *
175 "#,
176 )
177 .bind(&staleness_str)
178 .fetch_all(&self.pool)
179 .await
180 {
181 Ok(rescued) if !rescued.is_empty() => {
182 warn!(count = rescued.len(), "Rescued stale heartbeat jobs");
183 }
184 Err(err) => {
185 error!(error = %err, "Failed to rescue stale heartbeat jobs");
186 }
187 _ => {}
188 }
189 }
190
191 #[tracing::instrument(skip(self), name = "maintenance.rescue_deadline")]
193 async fn rescue_expired_deadlines(&self) {
194 match sqlx::query_as::<_, JobRow>(
195 r#"
196 UPDATE awa.jobs
197 SET state = 'retryable',
198 finalized_at = now(),
199 heartbeat_at = NULL,
200 deadline_at = NULL,
201 errors = errors || jsonb_build_object(
202 'error', 'hard deadline exceeded',
203 'attempt', attempt,
204 'at', now()
205 )::jsonb
206 WHERE id IN (
207 SELECT id FROM awa.jobs
208 WHERE state = 'running'
209 AND deadline_at IS NOT NULL
210 AND deadline_at < now()
211 LIMIT 500
212 FOR UPDATE SKIP LOCKED
213 )
214 RETURNING *
215 "#,
216 )
217 .fetch_all(&self.pool)
218 .await
219 {
220 Ok(rescued) if !rescued.is_empty() => {
221 warn!(count = rescued.len(), "Rescued deadline-expired jobs");
222 }
223 Err(err) => {
224 error!(error = %err, "Failed to rescue deadline-expired jobs");
225 }
226 _ => {}
227 }
228 }
229
230 #[tracing::instrument(skip(self), name = "maintenance.promote")]
232 async fn promote_scheduled(&self) {
233 match sqlx::query(
234 "UPDATE awa.jobs SET state = 'available' WHERE state = 'scheduled' AND run_at <= now()",
235 )
236 .execute(&self.pool)
237 .await
238 {
239 Ok(result) if result.rows_affected() > 0 => {
240 debug!(count = result.rows_affected(), "Promoted scheduled jobs");
241 }
242 Err(err) => {
243 error!(error = %err, "Failed to promote scheduled jobs");
244 }
245 _ => {}
246 }
247
248 match sqlx::query(
250 "UPDATE awa.jobs SET state = 'available' WHERE state = 'retryable' AND run_at <= now()",
251 )
252 .execute(&self.pool)
253 .await
254 {
255 Ok(result) if result.rows_affected() > 0 => {
256 debug!(
257 count = result.rows_affected(),
258 "Promoted retryable jobs (backoff elapsed)"
259 );
260 }
261 Err(err) => {
262 error!(error = %err, "Failed to promote retryable jobs");
263 }
264 _ => {}
265 }
266 }
267
268 #[tracing::instrument(skip(self), name = "maintenance.cleanup")]
270 async fn cleanup_completed(&self) {
271 let completed_retention = format!("{} seconds", self.completed_retention.as_secs());
272 let failed_retention = format!("{} seconds", self.failed_retention.as_secs());
273
274 match sqlx::query(
275 r#"
276 DELETE FROM awa.jobs
277 WHERE id IN (
278 SELECT id FROM awa.jobs
279 WHERE (state = 'completed' AND finalized_at < now() - $1::interval)
280 OR (state IN ('failed', 'cancelled') AND finalized_at < now() - $2::interval)
281 LIMIT 1000
282 )
283 "#,
284 )
285 .bind(&completed_retention)
286 .bind(&failed_retention)
287 .execute(&self.pool)
288 .await
289 {
290 Ok(result) if result.rows_affected() > 0 => {
291 info!(count = result.rows_affected(), "Cleaned up old jobs");
292 }
293 Err(err) => {
294 error!(error = %err, "Failed to clean up old jobs");
295 }
296 _ => {}
297 }
298 }
299}