1use thiserror::Error;
2
3pub type Result<T> = std::result::Result<T, Error>;
4
5#[derive(Debug, Error)]
6pub enum Error {
7 #[error(transparent)]
8 Scheduler(#[from] SchedulerError),
9 #[error(transparent)]
10 Worker(#[from] WorkerError),
11 #[error(transparent)]
12 Reaper(#[from] ReaperError),
13 #[error(transparent)]
14 Runtime(#[from] RuntimeError),
15}
16
17#[derive(Debug, Error)]
18pub enum SchedulerError {
19 #[error("failed to begin scheduler transaction")]
20 BeginTransaction { source: runledger_postgres::Error },
21 #[error("failed to commit scheduler transaction")]
22 CommitTransaction { source: runledger_postgres::Error },
23 #[error("failed to claim due schedules")]
24 ClaimDueSchedules { source: runledger_postgres::Error },
25 #[error("failed to create savepoint using `{statement}`")]
26 SavepointCreate {
27 statement: &'static str,
28 source: runledger_postgres::Error,
29 },
30 #[error("failed to rollback savepoint using `{statement}`")]
31 SavepointRollback {
32 statement: &'static str,
33 source: runledger_postgres::Error,
34 },
35 #[error("failed to release savepoint using `{statement}`")]
36 SavepointRelease {
37 statement: &'static str,
38 source: runledger_postgres::Error,
39 },
40 #[error("failed deferring failed schedule `{schedule_id}`")]
41 DeferFailedSchedule {
42 schedule_id: uuid::Uuid,
43 source: runledger_postgres::Error,
44 },
45 #[error(
46 "invalid cron expression for schedule `{schedule_name}` ({schedule_id}): `{cron_expr}`"
47 )]
48 InvalidCronExpression {
49 schedule_id: uuid::Uuid,
50 schedule_name: String,
51 cron_expr: String,
52 },
53 #[error("failed enqueueing scheduled job `{job_type}` from schedule `{schedule_id}`")]
54 EnqueueScheduledJob {
55 schedule_id: uuid::Uuid,
56 job_type: String,
57 source: runledger_postgres::Error,
58 },
59 #[error("failed marking schedule `{schedule_id}` as fired")]
60 MarkScheduleFired {
61 schedule_id: uuid::Uuid,
62 source: runledger_postgres::Error,
63 },
64 #[error("claimed schedule `{schedule_id}` was missing while {operation}")]
67 ClaimedScheduleMissing {
68 schedule_id: uuid::Uuid,
69 operation: &'static str,
70 },
71}
72
73#[derive(Debug, Error)]
74pub enum WorkerError {
75 #[error("failed claiming jobs for worker `{worker_id}`")]
76 ClaimJobs {
77 worker_id: String,
78 source: runledger_postgres::Error,
79 },
80 #[error("failed setting running progress for job `{job_id}` attempt `{attempt}`")]
81 SetRunningProgress {
82 job_id: uuid::Uuid,
83 attempt: i32,
84 source: runledger_postgres::Error,
85 },
86 #[error("failed releasing unstarted claim for job `{job_id}` attempt `{attempt}`")]
87 ReleaseUnstartedClaim {
88 job_id: uuid::Uuid,
89 attempt: i32,
90 source: runledger_postgres::Error,
91 },
92 #[error("failed completing job `{job_id}` attempt `{attempt}` as success")]
93 CompleteSuccess {
94 job_id: uuid::Uuid,
95 attempt: i32,
96 source: runledger_postgres::Error,
97 },
98 #[error("failed completing job `{job_id}` attempt `{attempt}` as failure")]
99 CompleteFailure {
100 job_id: uuid::Uuid,
101 attempt: i32,
102 source: runledger_postgres::Error,
103 },
104 #[error("failed heartbeat for job `{job_id}` attempt `{attempt}`")]
105 Heartbeat {
106 job_id: uuid::Uuid,
107 attempt: i32,
108 source: runledger_postgres::Error,
109 },
110}
111
112#[derive(Debug, Error)]
113pub enum ReaperError {
114 #[error(
115 "failed reaping expired leases with batch_size `{batch_size}` and retry_delay_ms `{retry_delay_ms}`"
116 )]
117 ReapExpiredLeases {
118 batch_size: i64,
119 retry_delay_ms: i32,
120 source: runledger_postgres::Error,
121 },
122}
123
124#[derive(Debug, Error)]
125pub enum RuntimeError {
126 #[error(
133 "supervisor builder received both a job registry and a job catalog; choose one registration source"
134 )]
135 MixedRegistrySources,
136 #[error(
143 "supervisor builder requires a job registry when worker or reaper loops are enabled \
144 (worker_enabled={worker_enabled}, reaper_enabled={reaper_enabled})"
145 )]
146 MissingRegistry {
147 worker_enabled: bool,
148 reaper_enabled: bool,
149 },
150 #[error("supervisor builder requires an active Tokio runtime")]
154 MissingTokioRuntime {
155 #[source]
156 source: tokio::runtime::TryCurrentError,
157 },
158 #[error("jobs runtime task `{task}` exited unexpectedly before shutdown")]
163 TaskExitedUnexpectedly { task: &'static str },
164 #[error("failed joining jobs runtime task `{task}`")]
167 TaskJoin {
168 task: &'static str,
169 #[source]
170 source: tokio::task::JoinError,
171 },
172 #[error("jobs runtime shutdown exceeded timeout {timeout:?}")]
177 ShutdownTimeout { timeout: std::time::Duration },
178 #[error("jobs runtime shutdown timeout {timeout:?} is too large to represent")]
181 ShutdownTimeoutTooLarge { timeout: std::time::Duration },
182 #[error(
186 "jobs runtime shutdown exceeded timeout {timeout:?} while draining after earlier task failure"
187 )]
188 ShutdownTimeoutAfterTaskError {
189 timeout: std::time::Duration,
190 #[source]
191 source: Box<RuntimeError>,
192 },
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198
199 #[test]
200 fn scheduler_invalid_cron_variant_contains_schedule_metadata() {
201 let schedule_id = uuid::Uuid::nil();
202 let error = SchedulerError::InvalidCronExpression {
203 schedule_id,
204 schedule_name: "nightly sync".to_string(),
205 cron_expr: "not cron".to_string(),
206 };
207
208 match error {
209 SchedulerError::InvalidCronExpression {
210 schedule_id: actual_id,
211 schedule_name,
212 cron_expr,
213 } => {
214 assert_eq!(actual_id, schedule_id);
215 assert_eq!(schedule_name, "nightly sync");
216 assert_eq!(cron_expr, "not cron");
217 }
218 other => panic!("expected invalid cron variant, got: {other:?}"),
219 }
220 }
221}