use std::convert::Infallible;
use apalis_core::{
task::{Task, metadata::MetadataExt},
task_fn::FromRequest,
};
type JsonMapMetadata = serde_json::Map<String, serde_json::Value>;
use serde::{
Deserialize, Serialize,
de::{DeserializeOwned, Error},
};
#[derive(Debug, Serialize, Deserialize)]
pub struct SqlContext<Pool> {
max_attempts: i32,
last_result: Option<serde_json::Value>,
lock_at: Option<i64>,
lock_by: Option<String>,
done_at: Option<i64>,
priority: i32,
queue: Option<String>,
meta: JsonMapMetadata,
_pool: std::marker::PhantomData<Pool>,
}
impl<Pool> Clone for SqlContext<Pool> {
fn clone(&self) -> Self {
Self {
lock_at: self.lock_at,
done_at: self.done_at,
max_attempts: self.max_attempts,
last_result: self.last_result.clone(),
lock_by: self.lock_by.clone(),
priority: self.priority,
queue: self.queue.clone(),
meta: self.meta.clone(),
_pool: std::marker::PhantomData,
}
}
}
impl<Pool> Default for SqlContext<Pool> {
fn default() -> Self {
Self::new()
}
}
impl<Pool> SqlContext<Pool> {
#[must_use]
pub fn new() -> Self {
Self {
lock_at: None,
done_at: None,
max_attempts: 5,
last_result: None,
lock_by: None,
priority: 0,
queue: None,
meta: Default::default(),
_pool: std::marker::PhantomData,
}
}
#[must_use]
pub fn with_max_attempts(mut self, max_attempts: i32) -> Self {
self.max_attempts = max_attempts;
self
}
#[must_use]
pub fn max_attempts(&self) -> i32 {
self.max_attempts
}
#[must_use]
pub fn done_at(&self) -> &Option<i64> {
&self.done_at
}
#[must_use]
pub fn with_done_at(mut self, done_at: Option<i64>) -> Self {
self.done_at = done_at;
self
}
#[must_use]
pub fn lock_at(&self) -> &Option<i64> {
&self.lock_at
}
#[must_use]
pub fn with_lock_at(mut self, lock_at: Option<i64>) -> Self {
self.lock_at = lock_at;
self
}
#[must_use]
pub fn lock_by(&self) -> &Option<String> {
&self.lock_by
}
#[must_use]
pub fn with_lock_by(mut self, lock_by: Option<String>) -> Self {
self.lock_by = lock_by;
self
}
#[must_use]
pub fn last_result(&self) -> &Option<serde_json::Value> {
&self.last_result
}
#[must_use]
pub fn with_last_result(mut self, result: Option<serde_json::Value>) -> Self {
self.last_result = result;
self
}
#[must_use]
pub fn with_priority(mut self, priority: i32) -> Self {
self.priority = priority;
self
}
#[must_use]
pub fn priority(&self) -> i32 {
self.priority
}
#[must_use]
pub fn queue(&self) -> &Option<String> {
&self.queue
}
#[must_use]
pub fn with_queue(mut self, queue: String) -> Self {
self.queue = Some(queue);
self
}
#[must_use]
pub fn meta(&self) -> &JsonMapMetadata {
&self.meta
}
#[must_use]
pub fn with_meta(mut self, meta: JsonMapMetadata) -> Self {
self.meta = meta;
self
}
}
impl<Args: Sync, IdType: Sync, Pool: Sync> FromRequest<Task<Args, Self, IdType>>
for SqlContext<Pool>
{
type Error = Infallible;
async fn from_request(req: &Task<Args, Self, IdType>) -> Result<Self, Self::Error> {
Ok(req.parts.ctx.clone())
}
}
impl<T: DeserializeOwned + Serialize, Pool> MetadataExt<T> for SqlContext<Pool> {
type Error = serde_json::Error;
fn extract(&self) -> Result<T, Self::Error> {
self.meta
.get(std::any::type_name::<T>())
.and_then(|v| T::deserialize(v).ok())
.ok_or(serde_json::Error::custom("Failed to extract metadata"))
}
fn inject(&mut self, value: T) -> Result<(), Self::Error> {
self.meta.insert(
std::any::type_name::<T>().to_owned(),
serde_json::to_value(&value).unwrap(),
);
Ok(())
}
}