1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use deadpool_sqlite::InteractError;

/// A [std::result::Result] whose error type defaults to [Error].
pub type Result<T, E = Error> = std::result::Result<T, E>;

/// Errors that can be returned from the queue.
#[derive(thiserror::Error, Debug)]
pub enum Error {
    /// An error occurred while updating the database to a new schema version.
    #[error("Migration error: {0}")]
    Migration(#[from] rusqlite_migration::Error),
    /// An error occurred while opening the database.
    #[error("Error opening database: {0}")]
    OpenDatabase(eyre::Report),
    /// Failed to acquire a database connection for reading.
    #[error("Error acquiring database connection: {0}")]
    PoolError(#[from] deadpool_sqlite::PoolError),
    /// Encountered an error communicating with the database.
    #[error("Database error: {0}")]
    Database(#[from] rusqlite::Error),
    /// The database contained invalid data.
    #[error("Unexpected value type for {1}: {0}")]
    ColumnType(#[source] rusqlite::Error, &'static str),
    /// An internal task panicked.
    #[error("Internal error: {0}")]
    Panic(#[from] tokio::task::JoinError),
    /// An internal error occurred while reading the database.
    #[error("Internal error: {0}")]
    DbInteract(String),
    /// When requesting a job status, the job ID was not found.
    #[error("Job not found")]
    NotFound,
    /// A job had an unknown state value
    #[error("Invalid job state {0}")]
    InvalidJobState(String),
    /// Failed to serialize or deserialize information when recording information about a job run.
    #[error("Error decoding job run info {0}")]
    InvalidJobRunInfo(serde_json::Error),
    /// Failed to serialize or deserialize a job payload
    #[error("Error processing payload: {0}")]
    PayloadError(serde_json::Error),
    /// Invalid value for a job timestamp
    #[error("Timestamp {0} out of range")]
    TimestampOutOfRange(&'static str),
    /// A worker attempted to finish a job more than once.
    #[error("Attempted to finish already-finished job")]
    JobAlreadyConsumed,
    /// The operation timed out. This is mostly used when the queue fails to shut down in a timely
    /// fashion.
    #[error("Timed out")]
    Timeout,
    /// The job could not be modified or cancelled because it is currently running.
    #[error("Job is running")]
    JobRunning,
    /// The job could not be modified or cancelled because it has already finished.
    #[error("Job is finished")]
    JobFinished,
    /// The current job has expired.
    #[error("Job expired")]
    Expired,
    /// An unregistered worker tried to communicate with the queue.
    #[error("Worker {0} not found")]
    WorkerNotFound(u64),
    /// Indicates that the queue has closed, and so the attempted operation could not be completed.
    #[error("Queue closed unexpectedly")]
    QueueClosed,
    /// The passed schedule specification is invalid
    #[error("Invalid schedule specification")]
    InvalidSchedule,
    /// A recurring job with the same ID already exists
    #[error("Recurring job {0} already exists")]
    RecurringJobAlreadyExists(String),
}

impl From<InteractError> for Error {
    fn from(e: InteractError) -> Self {
        Error::DbInteract(e.to_string())
    }
}

impl Error {
    pub(crate) fn open_database(err: impl Into<eyre::Report>) -> Self {
        Error::OpenDatabase(err.into())
    }
}