1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
use serde::{Deserialize, Serialize};
use crate::{job::JobId, request::JobState, worker::WorkerId, Timestamp};
#[cfg(feature = "extensions")]
use crate::error::JobError;
#[cfg(feature = "extensions")]
use http::Extensions;
#[cfg(feature = "extensions")]
use std::{any::Any, marker::Send};
/// The context for a job is represented here
/// Used to provide a context when a job is defined through the [Job] trait
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobContext {
pub(crate) id: JobId,
pub(crate) status: JobState,
pub(crate) run_at: Timestamp,
pub(crate) attempts: i32,
pub(crate) max_attempts: i32,
pub(crate) last_error: Option<String>,
pub(crate) lock_at: Option<Timestamp>,
pub(crate) lock_by: Option<WorkerId>,
pub(crate) done_at: Option<Timestamp>,
#[cfg(feature = "extensions")]
#[serde(skip)]
pub(crate) data: Data,
}
#[cfg(feature = "extensions")]
#[derive(Debug, Default)]
pub(crate) struct Data(Extensions);
#[cfg(feature = "extensions")]
impl Clone for Data {
fn clone(&self) -> Self {
Data(Extensions::new())
}
}
impl JobContext {
/// Build a new context with defaults given an ID.
#[must_use]
pub fn new(id: JobId) -> Self {
#[cfg(feature = "extensions")]
let data = {
let mut data = Data::default();
data.0.insert(id.clone());
data
};
JobContext {
id,
status: JobState::Pending,
#[cfg(feature = "chrono")]
run_at: chrono::Utc::now(),
#[cfg(all(not(feature = "chrono"), feature = "time"))]
run_at: time::OffsetDateTime::now_utc(),
lock_at: None,
done_at: None,
attempts: 0,
max_attempts: 25,
last_error: None,
lock_by: None,
#[cfg(feature = "extensions")]
data,
}
}
/// Get an optional reference to a type previously inserted on this `JobContext`.
///
/// # Example
///
/// ```
/// # use apalis_core::context::JobContext;
/// # use apalis_core::job::JobId;
/// let mut ctx = JobContext::new(JobId::new());
/// assert!(ctx.data_opt::<i32>().is_none());
/// ctx.insert(5i32);
///
/// assert_eq!(ctx.data_opt::<i32>(), Some(&5i32));
/// ```
#[cfg(feature = "extensions")]
#[must_use]
pub fn data_opt<D: Any + Send + Sync>(&self) -> Option<&D> {
self.data.0.get()
}
/// Get a reference to a type previously inserted on this `JobContext`.
///
/// # Errors
/// If the type requested is not in the `JobContext`
///
/// # Example
///
/// ```
/// # use apalis_core::context::JobContext;
/// # use apalis_core::job::JobId;
/// let mut ctx = JobContext::new(JobId::new());
/// assert!(ctx.data::<i32>().is_err());
/// assert_eq!(
/// ctx.data::<i32>().unwrap_err().to_string(),
/// "MissingContext: Attempted to fetch context of i32. Did you add `.layer(Extension(i32))"
/// );
/// ctx.insert(5i32);
///
/// assert_eq!(ctx.data::<i32>().unwrap(), &5i32);
/// ```
#[cfg(feature = "extensions")]
pub fn data<D: Any + Send + Sync>(&self) -> Result<&D, JobError> {
self.data.0.get().ok_or(JobError::MissingContext(format!(
"Attempted to fetch context of {}. Did you add `.layer(Extension({}))",
std::any::type_name::<D>(),
std::any::type_name::<D>()
)))
}
/// Insert a type into this `JobContext`.
///
/// Important for embedding data for a job.
/// If a extension of this type already existed, it will be returned.
///
/// # Example
///
/// ```
/// # use apalis_core::context::JobContext;
/// # use apalis_core::job::JobId;
/// let mut ctx = JobContext::new(JobId::new());
/// assert!(ctx.insert(5i32).is_none());
/// assert!(ctx.insert(4u8).is_none());
/// assert_eq!(ctx.insert(9i32), Some(5i32));
/// ```
#[cfg(feature = "extensions")]
pub fn insert<D: Any + Send + Sync + Clone>(&mut self, data: D) -> Option<D> {
self.data.0.insert(data)
}
/// Set the number of attempts
pub fn set_max_attempts(&mut self, max_attempts: i32) {
self.max_attempts = max_attempts;
}
/// Gets the maximum attempts for a job. Default 25
pub fn max_attempts(&self) -> i32 {
self.max_attempts
}
/// Get the id for a job
pub fn id(&self) -> &JobId {
&self.id
}
/// Gets the current attempts for a job. Default 0
pub fn attempts(&self) -> i32 {
self.attempts
}
/// Set the number of attempts
pub fn set_attempts(&mut self, attempts: i32) {
self.attempts = attempts;
}
/// Get the time a job was done
pub fn done_at(&self) -> &Option<Timestamp> {
&self.done_at
}
/// Set the time a job was done
pub fn set_done_at(&mut self, done_at: Option<Timestamp>) {
self.done_at = done_at;
}
/// Get the time a job is supposed to start
pub fn run_at(&self) -> &Timestamp {
&self.run_at
}
/// Set the time a job should run
pub fn set_run_at(&mut self, run_at: Timestamp) {
self.run_at = run_at;
}
/// Get the time a job was locked
pub fn lock_at(&self) -> &Option<Timestamp> {
&self.lock_at
}
/// Set the lock_at value
pub fn set_lock_at(&mut self, lock_at: Option<Timestamp>) {
self.lock_at = lock_at;
}
/// Get the job status
pub fn status(&self) -> &JobState {
&self.status
}
/// Set the job status
pub fn set_status(&mut self, status: JobState) {
self.status = status;
}
/// Get the time a job was locked
pub fn lock_by(&self) -> &Option<WorkerId> {
&self.lock_by
}
/// Set `lock_by`
pub fn set_lock_by(&mut self, lock_by: Option<WorkerId>) {
self.lock_by = lock_by;
}
/// Get the time a job was locked
pub fn last_error(&self) -> &Option<String> {
&self.last_error
}
/// Set the last error
pub fn set_last_error(&mut self, error: String) {
self.last_error = Some(error);
}
}
/// Gets you the job context of a request
/// This trait allows you to write your own request types
pub trait HasJobContext {
/// Gets a mutable reference to the job context.
fn context_mut(&mut self) -> &mut JobContext;
/// Gets a reference to the job context.
fn context(&self) -> &JobContext;
}