1use std::str::FromStr;
2
3use chrono::{DateTime, Duration, Utc};
4use cron::Schedule;
5use runledger_postgres::jobs::{self, JobEnqueue};
6use serde_json::{Value, json};
7use tokio::sync::watch;
8use tokio::time::sleep;
9use tracing::{info, warn};
10
11use crate::config::JobsConfig;
12use crate::{Result, SchedulerError};
13
14const FAILED_SCHEDULE_RETRY_DELAY_SECONDS: i64 = 30;
15const CREATE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL: &str = "SAVEPOINT materialize_due_schedule";
16const ROLLBACK_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL: &str =
17 "ROLLBACK TO SAVEPOINT materialize_due_schedule";
18const RELEASE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL: &str =
19 "RELEASE SAVEPOINT materialize_due_schedule";
20
21pub async fn run_scheduler_loop(
22 pool: runledger_postgres::DbPool,
23 config: JobsConfig,
24 mut shutdown: watch::Receiver<bool>,
25) {
26 loop {
27 if shutdown_requested_or_closed(&shutdown) {
28 break;
29 }
30
31 if let Err(error) = materialize_due_schedules(&pool, config.claim_batch_size).await {
32 warn!(%error, "schedule materialization failed");
33 }
34
35 if wait_for_shutdown_or_poll(&mut shutdown, config.schedule_poll_interval).await {
36 break;
37 }
38 }
39
40 info!("scheduler shutdown complete");
41}
42
43fn shutdown_requested_or_closed(shutdown: &watch::Receiver<bool>) -> bool {
44 *shutdown.borrow() || shutdown.has_changed().is_err()
45}
46
47async fn wait_for_shutdown_or_poll(
48 shutdown: &mut watch::Receiver<bool>,
49 poll_interval: std::time::Duration,
50) -> bool {
51 tokio::select! {
52 changed = shutdown.changed() => changed.is_err() || *shutdown.borrow(),
53 _ = sleep(poll_interval) => false,
54 }
55}
56
57async fn materialize_due_schedules(
58 pool: &runledger_postgres::DbPool,
59 batch_size: i64,
60) -> Result<()> {
61 let mut tx = pool
62 .begin()
63 .await
64 .map_err(|error| SchedulerError::BeginTransaction {
65 source: runledger_postgres::Error::ConnectionError(error.to_string()),
66 })?;
67
68 let now = Utc::now();
69 materialize_due_schedules_tx(&mut tx, now, batch_size).await?;
70
71 tx.commit()
72 .await
73 .map_err(|error| SchedulerError::CommitTransaction {
74 source: runledger_postgres::Error::ConnectionError(error.to_string()),
75 })?;
76 Ok(())
77}
78
79fn savepoint_error_variant(
80 statement: &'static str,
81 source: runledger_postgres::Error,
82) -> SchedulerError {
83 match statement {
84 CREATE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL => {
85 SchedulerError::SavepointCreate { statement, source }
86 }
87 ROLLBACK_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL => {
88 SchedulerError::SavepointRollback { statement, source }
89 }
90 RELEASE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL => {
91 SchedulerError::SavepointRelease { statement, source }
92 }
93 _ => unreachable!("unexpected savepoint statement: {statement}"),
94 }
95}
96
97async fn execute_savepoint_sql_tx(
98 tx: &mut runledger_postgres::DbTx<'_>,
99 statement: &'static str,
100) -> Result<()> {
101 match statement {
102 CREATE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL => {
103 sqlx::query!("SAVEPOINT materialize_due_schedule")
104 .execute(&mut **tx)
105 .await
106 .map_err(|error| {
107 savepoint_error_variant(
108 statement,
109 runledger_postgres::Error::ConnectionError(error.to_string()),
110 )
111 })?;
112 }
113 ROLLBACK_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL => {
114 sqlx::query!("ROLLBACK TO SAVEPOINT materialize_due_schedule")
115 .execute(&mut **tx)
116 .await
117 .map_err(|error| {
118 savepoint_error_variant(
119 statement,
120 runledger_postgres::Error::ConnectionError(error.to_string()),
121 )
122 })?;
123 }
124 RELEASE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL => {
125 sqlx::query!("RELEASE SAVEPOINT materialize_due_schedule")
126 .execute(&mut **tx)
127 .await
128 .map_err(|error| {
129 savepoint_error_variant(
130 statement,
131 runledger_postgres::Error::ConnectionError(error.to_string()),
132 )
133 })?;
134 }
135 _ => unreachable!("unexpected savepoint statement: {statement}"),
136 }
137
138 Ok(())
139}
140
141async fn materialize_due_schedules_tx(
142 tx: &mut runledger_postgres::DbTx<'_>,
143 now: DateTime<Utc>,
144 batch_size: i64,
145) -> Result<()> {
146 let schedules = jobs::claim_due_schedules_tx(tx, now, batch_size)
147 .await
148 .map_err(|source| SchedulerError::ClaimDueSchedules { source })?;
149 for schedule in schedules {
150 execute_savepoint_sql_tx(tx, CREATE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL).await?;
151
152 if let Err(error) = materialize_schedule_tx(tx, &schedule, now).await {
153 warn!(
154 %error,
155 schedule_id=%schedule.id,
156 schedule_name=%schedule.name,
157 "schedule materialization failed; skipping"
158 );
159 execute_savepoint_sql_tx(tx, ROLLBACK_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL).await?;
160 execute_savepoint_sql_tx(tx, RELEASE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL).await?;
161
162 let retry_at = failed_schedule_retry_at(now);
165 defer_failed_schedule_tx(tx, schedule.id, retry_at).await?;
166 continue;
167 }
168
169 execute_savepoint_sql_tx(tx, RELEASE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL).await?;
170 }
171
172 Ok(())
173}
174
175async fn defer_failed_schedule_tx(
176 tx: &mut runledger_postgres::DbTx<'_>,
177 schedule_id: uuid::Uuid,
178 next_fire_at: DateTime<Utc>,
179) -> Result<()> {
180 sqlx::query!(
181 "UPDATE job_schedules
182 SET next_fire_at = $2,
183 updated_at = now()
184 WHERE id = $1",
185 schedule_id,
186 next_fire_at,
187 )
188 .execute(&mut **tx)
189 .await
190 .map_err(|error| SchedulerError::DeferFailedSchedule {
191 schedule_id,
192 source: runledger_postgres::Error::from_query_sqlx_with_context(
193 "defer failed schedule",
194 error,
195 ),
196 })?;
197
198 Ok(())
199}
200
201async fn materialize_schedule_tx(
202 tx: &mut runledger_postgres::DbTx<'_>,
203 schedule: &jobs::JobScheduleRecord,
204 now: DateTime<Utc>,
205) -> Result<()> {
206 let next_fire_at = compute_next_fire_at_utc(
207 &schedule.cron_expr,
208 now,
209 schedule.id,
210 schedule.max_jitter_seconds,
211 )
212 .ok_or_else(|| invalid_schedule_cron_error(schedule))?;
213
214 let mut payload = schedule.payload_template.clone();
215 merge_schedule_metadata(&mut payload, schedule.id, &schedule.name);
216
217 let enqueue_payload = JobEnqueue {
218 job_type: schedule.job_type.as_borrowed(),
219 organization_id: schedule.organization_id,
220 payload: &payload,
221 priority: None,
222 max_attempts: None,
223 timeout_seconds: None,
224 next_run_at: Some(now),
225 idempotency_key: None,
226 stage: Some(runledger_core::jobs::JobStage::Scheduled),
227 };
228
229 jobs::enqueue_job_tx(tx, &enqueue_payload)
230 .await
231 .map_err(|source| SchedulerError::EnqueueScheduledJob {
232 schedule_id: schedule.id,
233 job_type: schedule.job_type.to_string(),
234 source,
235 })?;
236
237 jobs::mark_schedule_fired_tx(tx, schedule.id, now, next_fire_at)
238 .await
239 .map_err(|source| SchedulerError::MarkScheduleFired {
240 schedule_id: schedule.id,
241 source,
242 })?;
243 Ok(())
244}
245
246fn invalid_schedule_cron_error(schedule: &jobs::JobScheduleRecord) -> SchedulerError {
247 SchedulerError::InvalidCronExpression {
248 schedule_id: schedule.id,
249 schedule_name: schedule.name.clone(),
250 cron_expr: schedule.cron_expr.clone(),
251 }
252}
253
254fn merge_schedule_metadata(payload: &mut Value, schedule_id: uuid::Uuid, schedule_name: &str) {
255 let metadata = json!({
256 "schedule_id": schedule_id,
257 "schedule_name": schedule_name,
258 });
259
260 match payload {
261 Value::Object(map) => {
262 map.insert("_schedule".to_string(), metadata);
263 }
264 _ => {
265 let original_payload = std::mem::take(payload);
266 *payload = json!({
267 "payload": original_payload,
268 "_schedule": metadata,
269 });
270 }
271 }
272}
273
274fn compute_next_fire_at_utc(
276 cron_expr: &str,
277 from: DateTime<Utc>,
278 schedule_id: uuid::Uuid,
279 max_jitter_seconds: i32,
280) -> Option<DateTime<Utc>> {
281 let schedule = Schedule::from_str(cron_expr).ok()?;
282 let next = schedule.upcoming(Utc).find(|next| *next > from)?;
283 let jitter = schedule_jitter_seconds(schedule_id, next, max_jitter_seconds);
284 Some(next + Duration::seconds(jitter))
285}
286
287fn schedule_jitter_seconds(
288 schedule_id: uuid::Uuid,
289 next_fire_at: DateTime<Utc>,
290 max_jitter_seconds: i32,
291) -> i64 {
292 if max_jitter_seconds <= 0 {
293 return 0;
294 }
295
296 let max_range = max_jitter_seconds as u128 + 1;
297 let next_millis = next_fire_at.timestamp_millis() as u128;
298 ((schedule_id.as_u128() ^ next_millis) % max_range) as i64
299}
300
301fn failed_schedule_retry_at(now: DateTime<Utc>) -> DateTime<Utc> {
302 now + Duration::seconds(FAILED_SCHEDULE_RETRY_DELAY_SECONDS)
303}
304
305#[cfg(test)]
306mod tests;