Skip to main content

runledger_runtime/
scheduler.rs

1use std::str::FromStr;
2
3use chrono::{DateTime, Duration, Utc};
4use cron::Schedule;
5use runledger_postgres::jobs::{self, JobEnqueue};
6use serde_json::{Value, json};
7use tokio::sync::watch;
8use tokio::time::sleep;
9use tracing::{info, warn};
10
11use crate::config::JobsConfig;
12use crate::{Result, SchedulerError};
13
14const FAILED_SCHEDULE_RETRY_DELAY_SECONDS: i64 = 30;
15const CREATE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL: &str = "SAVEPOINT materialize_due_schedule";
16const ROLLBACK_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL: &str =
17    "ROLLBACK TO SAVEPOINT materialize_due_schedule";
18const RELEASE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL: &str =
19    "RELEASE SAVEPOINT materialize_due_schedule";
20
21pub async fn run_scheduler_loop(
22    pool: runledger_postgres::DbPool,
23    config: JobsConfig,
24    mut shutdown: watch::Receiver<bool>,
25) {
26    loop {
27        if shutdown_requested_or_closed(&shutdown) {
28            break;
29        }
30
31        if let Err(error) = materialize_due_schedules(&pool, config.claim_batch_size).await {
32            warn!(%error, "schedule materialization failed");
33        }
34
35        if wait_for_shutdown_or_poll(&mut shutdown, config.schedule_poll_interval).await {
36            break;
37        }
38    }
39
40    info!("scheduler shutdown complete");
41}
42
43fn shutdown_requested_or_closed(shutdown: &watch::Receiver<bool>) -> bool {
44    *shutdown.borrow() || shutdown.has_changed().is_err()
45}
46
47async fn wait_for_shutdown_or_poll(
48    shutdown: &mut watch::Receiver<bool>,
49    poll_interval: std::time::Duration,
50) -> bool {
51    tokio::select! {
52        changed = shutdown.changed() => changed.is_err() || *shutdown.borrow(),
53        _ = sleep(poll_interval) => false,
54    }
55}
56
57async fn materialize_due_schedules(
58    pool: &runledger_postgres::DbPool,
59    batch_size: i64,
60) -> Result<()> {
61    let mut tx = pool
62        .begin()
63        .await
64        .map_err(|error| SchedulerError::BeginTransaction {
65            source: runledger_postgres::Error::ConnectionError(error.to_string()),
66        })?;
67
68    let now = Utc::now();
69    materialize_due_schedules_tx(&mut tx, now, batch_size).await?;
70
71    tx.commit()
72        .await
73        .map_err(|error| SchedulerError::CommitTransaction {
74            source: runledger_postgres::Error::ConnectionError(error.to_string()),
75        })?;
76    Ok(())
77}
78
79fn savepoint_error_variant(
80    statement: &'static str,
81    source: runledger_postgres::Error,
82) -> SchedulerError {
83    match statement {
84        CREATE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL => {
85            SchedulerError::SavepointCreate { statement, source }
86        }
87        ROLLBACK_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL => {
88            SchedulerError::SavepointRollback { statement, source }
89        }
90        RELEASE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL => {
91            SchedulerError::SavepointRelease { statement, source }
92        }
93        _ => unreachable!("unexpected savepoint statement: {statement}"),
94    }
95}
96
97async fn execute_savepoint_sql_tx(
98    tx: &mut runledger_postgres::DbTx<'_>,
99    statement: &'static str,
100) -> Result<()> {
101    match statement {
102        CREATE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL => {
103            sqlx::query!("SAVEPOINT materialize_due_schedule")
104                .execute(&mut **tx)
105                .await
106                .map_err(|error| {
107                    savepoint_error_variant(
108                        statement,
109                        runledger_postgres::Error::ConnectionError(error.to_string()),
110                    )
111                })?;
112        }
113        ROLLBACK_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL => {
114            sqlx::query!("ROLLBACK TO SAVEPOINT materialize_due_schedule")
115                .execute(&mut **tx)
116                .await
117                .map_err(|error| {
118                    savepoint_error_variant(
119                        statement,
120                        runledger_postgres::Error::ConnectionError(error.to_string()),
121                    )
122                })?;
123        }
124        RELEASE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL => {
125            sqlx::query!("RELEASE SAVEPOINT materialize_due_schedule")
126                .execute(&mut **tx)
127                .await
128                .map_err(|error| {
129                    savepoint_error_variant(
130                        statement,
131                        runledger_postgres::Error::ConnectionError(error.to_string()),
132                    )
133                })?;
134        }
135        _ => unreachable!("unexpected savepoint statement: {statement}"),
136    }
137
138    Ok(())
139}
140
141async fn materialize_due_schedules_tx(
142    tx: &mut runledger_postgres::DbTx<'_>,
143    now: DateTime<Utc>,
144    batch_size: i64,
145) -> Result<()> {
146    let schedules = jobs::claim_due_schedules_tx(tx, now, batch_size)
147        .await
148        .map_err(|source| SchedulerError::ClaimDueSchedules { source })?;
149    for schedule in schedules {
150        execute_savepoint_sql_tx(tx, CREATE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL).await?;
151
152        if let Err(error) = materialize_schedule_tx(tx, &schedule, now).await {
153            warn!(
154                %error,
155                schedule_id=%schedule.id,
156                schedule_name=%schedule.name,
157                "schedule materialization failed; skipping"
158            );
159            execute_savepoint_sql_tx(tx, ROLLBACK_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL).await?;
160            execute_savepoint_sql_tx(tx, RELEASE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL).await?;
161
162            // Push failed schedules out of the immediate due window to avoid
163            // repeatedly selecting the same failing rows and starving valid schedules.
164            let retry_at = failed_schedule_retry_at(now);
165            defer_failed_schedule_tx(tx, schedule.id, retry_at).await?;
166            continue;
167        }
168
169        execute_savepoint_sql_tx(tx, RELEASE_MATERIALIZE_DUE_SCHEDULE_SAVEPOINT_SQL).await?;
170    }
171
172    Ok(())
173}
174
175async fn defer_failed_schedule_tx(
176    tx: &mut runledger_postgres::DbTx<'_>,
177    schedule_id: uuid::Uuid,
178    next_fire_at: DateTime<Utc>,
179) -> Result<()> {
180    sqlx::query!(
181        "UPDATE job_schedules
182         SET next_fire_at = $2,
183             updated_at = now()
184         WHERE id = $1",
185        schedule_id,
186        next_fire_at,
187    )
188    .execute(&mut **tx)
189    .await
190    .map_err(|error| SchedulerError::DeferFailedSchedule {
191        schedule_id,
192        source: runledger_postgres::Error::from_query_sqlx_with_context(
193            "defer failed schedule",
194            error,
195        ),
196    })?;
197
198    Ok(())
199}
200
201async fn materialize_schedule_tx(
202    tx: &mut runledger_postgres::DbTx<'_>,
203    schedule: &jobs::JobScheduleRecord,
204    now: DateTime<Utc>,
205) -> Result<()> {
206    let next_fire_at = compute_next_fire_at_utc(
207        &schedule.cron_expr,
208        now,
209        schedule.id,
210        schedule.max_jitter_seconds,
211    )
212    .ok_or_else(|| invalid_schedule_cron_error(schedule))?;
213
214    let mut payload = schedule.payload_template.clone();
215    merge_schedule_metadata(&mut payload, schedule.id, &schedule.name);
216
217    let enqueue_payload = JobEnqueue {
218        job_type: schedule.job_type.as_borrowed(),
219        organization_id: schedule.organization_id,
220        payload: &payload,
221        priority: None,
222        max_attempts: None,
223        timeout_seconds: None,
224        next_run_at: Some(now),
225        idempotency_key: None,
226        stage: Some(runledger_core::jobs::JobStage::Scheduled),
227    };
228
229    jobs::enqueue_job_tx(tx, &enqueue_payload)
230        .await
231        .map_err(|source| SchedulerError::EnqueueScheduledJob {
232            schedule_id: schedule.id,
233            job_type: schedule.job_type.to_string(),
234            source,
235        })?;
236
237    jobs::mark_schedule_fired_tx(tx, schedule.id, now, next_fire_at)
238        .await
239        .map_err(|source| SchedulerError::MarkScheduleFired {
240            schedule_id: schedule.id,
241            source,
242        })?;
243    Ok(())
244}
245
246fn invalid_schedule_cron_error(schedule: &jobs::JobScheduleRecord) -> SchedulerError {
247    SchedulerError::InvalidCronExpression {
248        schedule_id: schedule.id,
249        schedule_name: schedule.name.clone(),
250        cron_expr: schedule.cron_expr.clone(),
251    }
252}
253
254fn merge_schedule_metadata(payload: &mut Value, schedule_id: uuid::Uuid, schedule_name: &str) {
255    let metadata = json!({
256        "schedule_id": schedule_id,
257        "schedule_name": schedule_name,
258    });
259
260    match payload {
261        Value::Object(map) => {
262            map.insert("_schedule".to_string(), metadata);
263        }
264        _ => {
265            let original_payload = std::mem::take(payload);
266            *payload = json!({
267                "payload": original_payload,
268                "_schedule": metadata,
269            });
270        }
271    }
272}
273
274/// Scheduling semantics are UTC-only across the jobs framework.
275fn compute_next_fire_at_utc(
276    cron_expr: &str,
277    from: DateTime<Utc>,
278    schedule_id: uuid::Uuid,
279    max_jitter_seconds: i32,
280) -> Option<DateTime<Utc>> {
281    let schedule = Schedule::from_str(cron_expr).ok()?;
282    let next = schedule.upcoming(Utc).find(|next| *next > from)?;
283    let jitter = schedule_jitter_seconds(schedule_id, next, max_jitter_seconds);
284    Some(next + Duration::seconds(jitter))
285}
286
287fn schedule_jitter_seconds(
288    schedule_id: uuid::Uuid,
289    next_fire_at: DateTime<Utc>,
290    max_jitter_seconds: i32,
291) -> i64 {
292    if max_jitter_seconds <= 0 {
293        return 0;
294    }
295
296    let max_range = max_jitter_seconds as u128 + 1;
297    let next_millis = next_fire_at.timestamp_millis() as u128;
298    ((schedule_id.as_u128() ^ next_millis) % max_range) as i64
299}
300
301fn failed_schedule_retry_at(now: DateTime<Utc>) -> DateTime<Utc> {
302    now + Duration::seconds(FAILED_SCHEDULE_RETRY_DELAY_SECONDS)
303}
304
305#[cfg(test)]
306mod tests;