1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use deadpool_sqlite::InteractError;
/// A [std::result::Result] whose error type defaults to [Error].
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Errors that can be returned from the queue.
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// An error occurred while updating the database to a new schema version.
#[error("Migration error: {0}")]
Migration(#[from] rusqlite_migration::Error),
/// An error occurred while opening the database.
#[error("Error opening database: {0}")]
OpenDatabase(eyre::Report),
/// Failed to acquire a database connection for reading.
#[error("Error acquiring database connection: {0}")]
PoolError(#[from] deadpool_sqlite::PoolError),
/// Encountered an error communicating with the database.
#[error("Database error: {0}")]
Database(#[from] rusqlite::Error),
/// The database contained invalid data.
#[error("Unexpected value type for {1}: {0}")]
ColumnType(#[source] rusqlite::Error, &'static str),
/// An internal task panicked.
#[error("Internal error: {0}")]
Panic(#[from] tokio::task::JoinError),
/// An internal error occurred while reading the database.
#[error("Internal error: {0}")]
DbInteract(String),
/// When requesting a job status, the job ID was not found.
#[error("Job not found")]
NotFound,
/// A job had an unknown state value
#[error("Invalid job state {0}")]
InvalidJobState(String),
/// Failed to serialize or deserialize information when recording information about a job run.
#[error("Error decoding job run info {0}")]
InvalidJobRunInfo(serde_json::Error),
/// Failed to serialize or deserialize a job payload
#[error("Error processing payload: {0}")]
PayloadError(serde_json::Error),
/// Invalid value for a job timestamp
#[error("Timestamp {0} out of range")]
TimestampOutOfRange(&'static str),
/// A worker attempted to finish a job more than once.
#[error("Attempted to finish already-finished job")]
JobAlreadyConsumed,
/// The operation timed out. This is mostly used when the queue fails to shut down in a timely
/// fashion.
#[error("Timed out")]
Timeout,
/// The job could not be modified or cancelled because it is currently running.
#[error("Job is running")]
JobRunning,
/// The job could not be modified or cancelled because it has already finished.
#[error("Job is finished")]
JobFinished,
/// The current job has expired.
#[error("Job expired")]
Expired,
/// An unregistered worker tried to communicate with the queue.
#[error("Worker {0} not found")]
WorkerNotFound(u64),
/// Indicates that the queue has closed, and so the attempted operation could not be completed.
#[error("Queue closed unexpectedly")]
QueueClosed,
/// The passed schedule specification is invalid
#[error("Invalid schedule specification")]
InvalidSchedule,
/// A recurring job with the same ID already exists
#[error("Recurring job {0} already exists")]
RecurringJobAlreadyExists(String),
}
impl Error {
/// Returns if the error indicates that an update or cancel request occurred too late
pub fn is_update_too_late(&self) -> bool {
matches!(self, Error::JobRunning | Error::JobFinished)
}
}
impl From<InteractError> for Error {
fn from(e: InteractError) -> Self {
Error::DbInteract(e.to_string())
}
}
impl Error {
pub(crate) fn open_database(err: impl Into<eyre::Report>) -> Self {
Error::OpenDatabase(err.into())
}
}