1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
#![deny(missing_docs)]

//! # Background Jobs Core
//! _basic types and traits for implementing a background jobs processor_
//!
//! This crate shouldn't be depended on directly, except in the case of implementing a custom jobs
//! processor. For a default solution based on Actix and Sled, look at the `background-jobs` crate.

use anyhow::Error;

#[cfg(feature = "with-actix")]
mod actix_job;
mod job;
mod job_info;
mod processor_map;
mod stats;
mod storage;

pub use crate::{
    job::{new_job, new_scheduled_job, process, Job},
    job_info::{JobInfo, NewJobInfo, ReturnJobInfo},
    processor_map::{CachedProcessorMap, ProcessorMap},
    stats::{JobStat, Stats},
    storage::{memory_storage, Storage},
};

#[cfg(feature = "with-actix")]
pub use actix_job::ActixJob;

#[derive(Debug, thiserror::Error)]
/// The error type returned by the `process` method
pub enum JobError {
    /// Some error occurred while processing the job
    #[error("Error performing job: {0}")]
    Processing(#[from] Error),

    /// Creating a `Job` type from the provided `serde_json::Value` failed
    #[error("Could not make JSON value from arguments")]
    Json,

    /// This job type was not registered for this client
    #[error("This job type was not registered for the client")]
    Unregistered,
}

#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
/// Indicate the state of a job after an attempted run
pub enum JobResult {
    /// The job succeeded
    Success,

    /// The job failed
    Failure,

    /// The worker had no concept of this job
    Unregistered,

    /// The worker requesting this job closed
    Unexecuted,
}

impl JobResult {
    /// Indicate a successful job
    pub fn success() -> Self {
        JobResult::Success
    }

    /// Indicate a failed job
    pub fn failure() -> Self {
        JobResult::Failure
    }

    /// Indicate that the job was not registered for this worker
    pub fn unregistered() -> Self {
        JobResult::Unregistered
    }

    /// Check if the job failed
    pub fn is_failure(&self) -> bool {
        *self == JobResult::Failure
    }

    /// Check if the job succeeded
    pub fn is_success(&self) -> bool {
        *self == JobResult::Success
    }

    /// Check if the job is missing it's processor
    pub fn is_unregistered(&self) -> bool {
        *self == JobResult::Unregistered
    }

    /// Check if the job was returned without an execution attempt
    pub fn is_unexecuted(&self) -> bool {
        *self == JobResult::Unexecuted
    }
}

#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
/// Set the status of a job when storing it
pub enum JobStatus {
    /// Job should be queued
    Pending,

    /// Job is running
    Running,
}

impl JobStatus {
    /// The job should be queued
    pub fn pending() -> Self {
        JobStatus::Pending
    }

    /// The job is running
    pub fn running() -> Self {
        JobStatus::Running
    }

    /// Check if the job is ready to be queued
    pub fn is_pending(&self) -> bool {
        *self == JobStatus::Pending
    }

    /// Check if the job is running
    pub fn is_running(&self) -> bool {
        *self == JobStatus::Running
    }
}

impl std::fmt::Display for JobStatus {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            JobStatus::Pending => write!(f, "Pending"),
            JobStatus::Running => write!(f, "Running"),
        }
    }
}

#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd, thiserror::Error)]
#[error("Invalid job status")]
/// The error generated when parsing a job's status if it's not 'Pending' or 'Running'
pub struct JobStatusError;

impl std::str::FromStr for JobStatus {
    type Err = JobStatusError;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "Pending" => Ok(JobStatus::Pending),
            "Running" => Ok(JobStatus::Running),
            _ => Err(JobStatusError),
        }
    }
}

#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
/// Different styles for retrying jobs
pub enum Backoff {
    /// Seconds between execution
    ///
    /// For example, `Backoff::Linear(5)` will retry a failed job 5 seconds after the previous
    /// attempt
    Linear(usize),

    /// Base for seconds between execution
    ///
    /// For example, `Backoff::Exponential(2)` will retry a failed job 2 seconds after the first
    /// failure, 4 seconds after the second failure, 8 seconds after the third failure, and 16
    /// seconds after the fourth failure
    Exponential(usize),
}

#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
/// How many times a job should be retried before giving up
pub enum MaxRetries {
    /// Keep retrying forever
    Infinite,

    /// Put a limit on the number of retries
    Count(usize),
}

impl MaxRetries {
    fn compare(&self, retry_count: u32) -> ShouldStop {
        match *self {
            MaxRetries::Infinite => ShouldStop::Requeue,
            MaxRetries::Count(ref count) => {
                if (retry_count as usize) <= *count {
                    ShouldStop::Requeue
                } else {
                    ShouldStop::LimitReached
                }
            }
        }
    }
}

#[derive(Clone, Debug, Eq, PartialEq)]
/// A type that represents whether a job should be requeued
pub enum ShouldStop {
    /// The job has hit the maximum allowed number of retries, and should be failed permanently
    LimitReached,

    /// The job is allowed to be put back into the job queue
    Requeue,
}

impl ShouldStop {
    /// A boolean representation of this state
    pub fn should_requeue(&self) -> bool {
        *self == ShouldStop::Requeue
    }
}

impl From<serde_json::error::Error> for JobError {
    fn from(_: serde_json::error::Error) -> Self {
        JobError::Json
    }
}