Skip to main content

runledger_runtime/
error.rs

1use thiserror::Error;
2
3pub type Result<T> = std::result::Result<T, Error>;
4
5#[derive(Debug, Error)]
6pub enum Error {
7    #[error(transparent)]
8    Scheduler(#[from] SchedulerError),
9    #[error(transparent)]
10    Worker(#[from] WorkerError),
11    #[error(transparent)]
12    Reaper(#[from] ReaperError),
13    #[error(transparent)]
14    Runtime(#[from] RuntimeError),
15}
16
17#[derive(Debug, Error)]
18pub enum SchedulerError {
19    #[error("failed to begin scheduler transaction")]
20    BeginTransaction { source: runledger_postgres::Error },
21    #[error("failed to commit scheduler transaction")]
22    CommitTransaction { source: runledger_postgres::Error },
23    #[error("failed to claim due schedules")]
24    ClaimDueSchedules { source: runledger_postgres::Error },
25    #[error("failed to create savepoint using `{statement}`")]
26    SavepointCreate {
27        statement: &'static str,
28        source: runledger_postgres::Error,
29    },
30    #[error("failed to rollback savepoint using `{statement}`")]
31    SavepointRollback {
32        statement: &'static str,
33        source: runledger_postgres::Error,
34    },
35    #[error("failed to release savepoint using `{statement}`")]
36    SavepointRelease {
37        statement: &'static str,
38        source: runledger_postgres::Error,
39    },
40    #[error("failed deferring failed schedule `{schedule_id}`")]
41    DeferFailedSchedule {
42        schedule_id: uuid::Uuid,
43        source: runledger_postgres::Error,
44    },
45    #[error(
46        "invalid cron expression for schedule `{schedule_name}` ({schedule_id}): `{cron_expr}`"
47    )]
48    InvalidCronExpression {
49        schedule_id: uuid::Uuid,
50        schedule_name: String,
51        cron_expr: String,
52    },
53    #[error("failed enqueueing scheduled job `{job_type}` from schedule `{schedule_id}`")]
54    EnqueueScheduledJob {
55        schedule_id: uuid::Uuid,
56        job_type: String,
57        source: runledger_postgres::Error,
58    },
59    #[error("failed marking schedule `{schedule_id}` as fired")]
60    MarkScheduleFired {
61        schedule_id: uuid::Uuid,
62        source: runledger_postgres::Error,
63    },
64    /// A schedule row returned by the runtime claim query disappeared before the
65    /// scheduler could advance or defer its next fire cursor.
66    #[error("claimed schedule `{schedule_id}` was missing while {operation}")]
67    ClaimedScheduleMissing {
68        schedule_id: uuid::Uuid,
69        operation: &'static str,
70    },
71}
72
73#[derive(Debug, Error)]
74pub enum WorkerError {
75    #[error("failed claiming jobs for worker `{worker_id}`")]
76    ClaimJobs {
77        worker_id: String,
78        source: runledger_postgres::Error,
79    },
80    #[error("failed setting running progress for job `{job_id}` attempt `{attempt}`")]
81    SetRunningProgress {
82        job_id: uuid::Uuid,
83        attempt: i32,
84        source: runledger_postgres::Error,
85    },
86    #[error("failed releasing unstarted claim for job `{job_id}` attempt `{attempt}`")]
87    ReleaseUnstartedClaim {
88        job_id: uuid::Uuid,
89        attempt: i32,
90        source: runledger_postgres::Error,
91    },
92    #[error("failed completing job `{job_id}` attempt `{attempt}` as success")]
93    CompleteSuccess {
94        job_id: uuid::Uuid,
95        attempt: i32,
96        source: runledger_postgres::Error,
97    },
98    #[error("failed completing job `{job_id}` attempt `{attempt}` as failure")]
99    CompleteFailure {
100        job_id: uuid::Uuid,
101        attempt: i32,
102        source: runledger_postgres::Error,
103    },
104    #[error("failed heartbeat for job `{job_id}` attempt `{attempt}`")]
105    Heartbeat {
106        job_id: uuid::Uuid,
107        attempt: i32,
108        source: runledger_postgres::Error,
109    },
110}
111
112#[derive(Debug, Error)]
113pub enum ReaperError {
114    #[error(
115        "failed reaping expired leases with batch_size `{batch_size}` and retry_delay_ms `{retry_delay_ms}`"
116    )]
117    ReapExpiredLeases {
118        batch_size: i64,
119        retry_delay_ms: i32,
120        source: runledger_postgres::Error,
121    },
122}
123
124#[derive(Debug, Error)]
125pub enum RuntimeError {
126    /// The supervisor builder received both direct registry and catalog
127    /// registration sources. Choose either [`SupervisorBuilder::with_registry`]
128    /// or [`SupervisorBuilder::with_catalog`] for a single builder.
129    ///
130    /// [`SupervisorBuilder::with_catalog`]: crate::supervisor::SupervisorBuilder::with_catalog
131    /// [`SupervisorBuilder::with_registry`]: crate::supervisor::SupervisorBuilder::with_registry
132    #[error(
133        "supervisor builder received both a job registry and a job catalog; choose one registration source"
134    )]
135    MixedRegistrySources,
136    /// The supervisor builder requires a job registry when the worker or reaper
137    /// loop is enabled, but none was provided. Call
138    /// [`SupervisorBuilder::with_registry`] before [`SupervisorBuilder::build`].
139    ///
140    /// [`SupervisorBuilder::with_registry`]: crate::supervisor::SupervisorBuilder::with_registry
141    /// [`SupervisorBuilder::build`]: crate::supervisor::SupervisorBuilder::build
142    #[error(
143        "supervisor builder requires a job registry when worker or reaper loops are enabled \
144         (worker_enabled={worker_enabled}, reaper_enabled={reaper_enabled})"
145    )]
146    MissingRegistry {
147        worker_enabled: bool,
148        reaper_enabled: bool,
149    },
150    /// The supervisor builder must be called from within an active Tokio runtime
151    /// context. Ensure you are calling it inside a `#[tokio::main]`, `#[tokio::test]`,
152    /// or within a spawned task inside an existing runtime.
153    #[error("supervisor builder requires an active Tokio runtime")]
154    MissingTokioRuntime {
155        #[source]
156        source: tokio::runtime::TryCurrentError,
157    },
158    /// A supervised runtime task exited cleanly before shutdown was requested.
159    /// This is treated as an error because long-running loops should only exit
160    /// in response to a shutdown signal. Investigate logs for the specific task
161    /// that exited unexpectedly.
162    #[error("jobs runtime task `{task}` exited unexpectedly before shutdown")]
163    TaskExitedUnexpectedly { task: &'static str },
164    /// A supervised task panicked or failed to join cleanly. Examine logs and
165    /// process panic output for more details about the underlying task failure.
166    #[error("failed joining jobs runtime task `{task}`")]
167    TaskJoin {
168        task: &'static str,
169        #[source]
170        source: tokio::task::JoinError,
171    },
172    /// The supervisor did not complete shutdown within the requested timeout.
173    /// Some tasks may not have received or responded to the shutdown signal.
174    /// Consider increasing the timeout or investigating why tasks are shutting
175    /// down slowly.
176    #[error("jobs runtime shutdown exceeded timeout {timeout:?}")]
177    ShutdownTimeout { timeout: std::time::Duration },
178    /// The requested shutdown timeout is too large to represent as a deadline.
179    /// Use a smaller timeout value.
180    #[error("jobs runtime shutdown timeout {timeout:?} is too large to represent")]
181    ShutdownTimeoutTooLarge { timeout: std::time::Duration },
182    /// A supervised task failed (panicked or exited unexpectedly) and the
183    /// remaining tasks could not be shut down within the timeout. The original
184    /// task failure is available through the source error.
185    #[error(
186        "jobs runtime shutdown exceeded timeout {timeout:?} while draining after earlier task failure"
187    )]
188    ShutdownTimeoutAfterTaskError {
189        timeout: std::time::Duration,
190        #[source]
191        source: Box<RuntimeError>,
192    },
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198
199    #[test]
200    fn scheduler_invalid_cron_variant_contains_schedule_metadata() {
201        let schedule_id = uuid::Uuid::nil();
202        let error = SchedulerError::InvalidCronExpression {
203            schedule_id,
204            schedule_name: "nightly sync".to_string(),
205            cron_expr: "not cron".to_string(),
206        };
207
208        match error {
209            SchedulerError::InvalidCronExpression {
210                schedule_id: actual_id,
211                schedule_name,
212                cron_expr,
213            } => {
214                assert_eq!(actual_id, schedule_id);
215                assert_eq!(schedule_name, "nightly sync");
216                assert_eq!(cron_expr, "not cron");
217            }
218            other => panic!("expected invalid cron variant, got: {other:?}"),
219        }
220    }
221}