use log::{debug, info};
use redis::{aio::ConnectionLike, AsyncCommands, Pipeline, RedisWrite, ToRedisArgs};
use super::{RedisQueue, RedisManager};
use crate::models::{job, DateTime, OcyError, OcyResult};
use crate::transaction_async;
#[derive(Debug)]
pub struct RedisJob<'a> {
redis_manager: &'a RedisManager,
pub id: u64,
pub key: String,
}
impl<'a> ToRedisArgs for &RedisJob<'a> {
fn to_redis_args(&self) -> Vec<Vec<u8>> {
self.key.to_redis_args()
}
fn write_redis_args<W: ?Sized + RedisWrite>(&self, out: &mut W) {
self.key.write_redis_args(out)
}
}
impl<'a> RedisJob<'a> {
pub fn new(redis_manager: &'a RedisManager, id: u64) -> Self {
Self {
redis_manager,
id,
key: Self::build_key(&redis_manager.job_prefix, id),
}
}
pub async fn metadata<C>(&self, conn: &mut C, fields: &[job::Field]) -> OcyResult<job::JobMeta>
where
C: ConnectionLike,
{
let (fields, hidden_fields): (Vec<job::Field>, Vec<job::Field>) = {
let mut fields: Vec<job::Field> = fields.to_vec();
let mut hidden_fields = Vec::new();
if fields.contains(&job::Field::Ended) {
if !fields.contains(&job::Field::Retries) {
hidden_fields.push(job::Field::Retries);
fields.push(job::Field::Retries);
}
if !fields.contains(&job::Field::RetriesAttempted) {
hidden_fields.push(job::Field::RetriesAttempted);
fields.push(job::Field::RetriesAttempted);
}
if !fields.contains(&job::Field::Status) {
hidden_fields.push(job::Field::Status);
fields.push(job::Field::Status);
}
}
(fields, hidden_fields)
};
let (id, value): (Option<u64>, redis::Value) = redis::pipe()
.atomic()
.hget(&self.key, job::Field::Id)
.hget(&self.key, fields.as_slice())
.query_async(conn)
.await?;
if id.is_none() {
return Err(OcyError::NoSuchJob(self.id));
}
Ok(job::JobMeta::from_redis_value(
&fields,
&value,
&hidden_fields,
)?)
}
pub async fn timeout_metadata<C>(&self, conn: &mut C) -> OcyResult<job::TimeoutMeta>
where
C: ConnectionLike + Send,
{
Ok(job::TimeoutMeta::from_conn(conn, self).await?)
}
pub fn id(&self) -> u64 {
self.id
}
pub fn key(&self) -> &str {
&self.key
}
fn build_key(job_key_prefix: &str, id: u64) -> String {
format!("{}{}", job_key_prefix, id)
}
pub async fn update<C>(&self, conn: &mut C, update_req: &job::UpdateRequest) -> OcyResult<()>
where
C: ConnectionLike + Send,
{
debug!("[{}] update request: {:?}", &self.key, update_req);
let _: () = transaction_async!(conn, &[&self.key], {
let mut pipe = redis::pipe();
let pipe_ref = pipe.atomic();
if let Some(ref output) = update_req.output {
self.set_output_in_pipe(conn, pipe_ref, output).await?;
}
if let Some(ref status) = update_req.status {
self.set_status_in_pipe(conn, pipe_ref, status).await?;
info!("[{}] {}", &self.key, status);
}
pipe.query_async(conn).await?
});
Ok(())
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_lifetimes))]
pub fn complete<'b>(&self, pipe: &'b mut Pipeline) -> &'b mut Pipeline {
pipe.hset(&self.key, job::Field::Status, job::Status::Completed)
.hset(&self.key, job::Field::EndedAt, DateTime::now())
.lrem(&self.redis_manager.running_key, 1, self.id)
.lpush(&self.redis_manager.ended_key, self.id)
.incr(&self.redis_manager.stat_jobs_completed_key, 1)
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_lifetimes))]
pub async fn requeue<'b, C: ConnectionLike + Send>(
&self,
conn: &mut C,
pipe: &'b mut Pipeline,
incr_retries: bool,
) -> OcyResult<&'b mut Pipeline> {
let queue = self.queue(conn).await?.ensure_exists(conn).await?;
pipe.hdel(
&self.key,
&[
job::Field::StartedAt,
job::Field::EndedAt,
job::Field::LastHeartbeat,
job::Field::Output,
],
)
.hset(&self.key, job::Field::Status, job::Status::Queued)
.lrem(&self.redis_manager.failed_key, 1, self.id)
.lrem(&self.redis_manager.ended_key, 1, self.id)
.lpush(&queue.jobs_key, self.id)
.incr(&self.redis_manager.stat_jobs_retried_key, 1);
if incr_retries {
pipe.hincr(&self.key, job::Field::RetriesAttempted, 1);
}
Ok(pipe)
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_lifetimes))]
pub async fn cancel<'b, C: ConnectionLike + Send>(
&self,
conn: &mut C,
pipe: &'b mut Pipeline,
) -> OcyResult<&'b mut Pipeline> {
let queue = self.queue(conn).await?;
Ok(pipe
.hset(&self.key, job::Field::Status, job::Status::Cancelled)
.hset(&self.key, job::Field::EndedAt, DateTime::now())
.lrem(&self.redis_manager.running_key, 1, self.id) .lrem(&self.redis_manager.failed_key, 1, self.id) .lrem(&queue.jobs_key, 1, self.id) .rpush(&self.redis_manager.ended_key, self.id) .incr(&self.redis_manager.stat_jobs_cancelled_key, 1))
}
pub fn fail<'b>(&self, pipe: &'b mut Pipeline, status: &job::Status) -> &'b mut Pipeline {
assert!(status == &job::Status::TimedOut || status == &job::Status::Failed);
let stats_key = match status {
job::Status::TimedOut => &self.redis_manager.stat_jobs_timed_out_key,
job::Status::Failed => &self.redis_manager.stat_jobs_failed_key,
_ => panic!("fail() was called with invalid status of: {}", status),
};
pipe.hset(&self.key, job::Field::Status, status)
.hset(&self.key, job::Field::EndedAt, DateTime::now())
.lrem(&self.redis_manager.running_key, 1, self.id)
.rpush(&self.redis_manager.failed_key, self.id)
.incr(stats_key, 1)
}
pub async fn end_failed<C: ConnectionLike + Send>(&self, conn: &mut C) -> OcyResult<bool> {
let result: bool = transaction_async!(conn, &[&self.key], {
let mut pipe = redis::pipe();
let pipe_ref = pipe.atomic();
match self.status(conn).await {
Ok(job::Status::Failed) | Ok(job::Status::TimedOut) => {
let result: Option<()> = pipe_ref
.lrem(&self.redis_manager.failed_key, 1, self.id)
.rpush(&self.redis_manager.ended_key, self.id)
.query_async(conn)
.await?;
result.map(|_| true)
}
Ok(_) => Some(false), Err(OcyError::NoSuchJob(_)) => Some(false), Err(err) => return Err(err),
}
});
Ok(result)
}
pub async fn fields<C: ConnectionLike>(
&self,
conn: &mut C,
fields: Option<&[job::Field]>,
) -> OcyResult<job::JobMeta> {
debug!("Fetching job info for job_id={}", self.id);
let fields = fields.unwrap_or_else(|| job::Field::all_fields());
self.metadata(conn, fields).await
}
pub async fn queue<C: ConnectionLike + Send>(&self, conn: &mut C) -> OcyResult<RedisQueue<'a>> {
match conn
.hget::<_, _, Option<String>>(&self.key, job::Field::Queue)
.await?
{
Some(queue) => Ok(RedisQueue::new(self.redis_manager, queue)?),
None => Err(OcyError::NoSuchJob(self.id)),
}
}
pub async fn output<C: ConnectionLike + Send>(
&self,
conn: &mut C,
) -> OcyResult<serde_json::Value> {
debug!("Getting job output for job_id={}", self.id);
let (exists, output): (bool, Option<String>) = redis::pipe()
.atomic()
.exists(&self.key)
.hget(&self.key, job::Field::Output)
.query_async(conn)
.await?;
if exists {
match output {
Some(ref s) => Ok(serde_json::from_str(s)?),
None => Ok(serde_json::Value::Null),
}
} else {
Err(OcyError::NoSuchJob(self.id))
}
}
pub async fn set_output<C: ConnectionLike + Send>(
&self,
conn: &mut C,
value: &serde_json::Value,
) -> OcyResult<()> {
let _: () = transaction_async!(conn, &[&self.key], {
let mut pipe = redis::pipe();
let pipe_ref = self.set_output_in_pipe(conn, pipe.atomic(), value).await?;
pipe_ref.query_async(conn).await?
});
Ok(())
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_lifetimes))]
pub async fn set_output_in_pipe<'b, C: ConnectionLike + Send>(
&self,
conn: &mut C,
pipe: &'b mut Pipeline,
value: &serde_json::Value,
) -> OcyResult<&'b mut Pipeline> {
match self.status(conn).await? {
job::Status::Running => Ok(pipe.hset(&self.key, job::Field::Output, value.to_string())),
_ => Err(OcyError::conflict("Can only set output for running jobs")),
}
}
pub async fn status<C: ConnectionLike + Send>(&self, conn: &mut C) -> OcyResult<job::Status> {
debug!("Fetching job status for job_id={}", self.id);
conn.hget::<_, _, Option<job::Status>>(&self.key, job::Field::Status)
.await?
.ok_or(OcyError::NoSuchJob(self.id))
}
pub async fn set_status<C: ConnectionLike + Send>(
&self,
conn: &mut C,
status: &job::Status,
) -> OcyResult<()> {
let watch_keys = match status {
job::Status::Queued => vec![
self.key.to_owned(),
self.queue(conn).await?.jobs_key,
],
_ => vec![self.key.to_owned()],
};
let _: () = transaction_async!(conn, &watch_keys[..], {
self.set_status_in_pipe(conn, redis::pipe().atomic(), status)
.await?
.query_async(conn)
.await?
});
info!("[{}] {}", &self.key, status);
Ok(())
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_lifetimes))]
pub async fn set_status_in_pipe<'b, C: ConnectionLike + Send>(
&self,
conn: &mut C,
pipe: &'b mut Pipeline,
status: &job::Status,
) -> OcyResult<&'b mut Pipeline> {
let current_status = self.status(conn).await?; debug!(
"Status update for job_id={}, {} -> {}",
self.id, ¤t_status, &status
);
Ok(match (current_status, status) {
(job::Status::Running, job::Status::Completed) => self.complete(pipe),
(job::Status::Running, cause @ job::Status::Failed) => self.fail(pipe, cause),
(job::Status::Running, cause @ job::Status::TimedOut) => self.fail(pipe, cause),
(job::Status::Running, job::Status::Cancelled) => self.cancel(conn, pipe).await?,
(job::Status::Failed, job::Status::Cancelled) => self.cancel(conn, pipe).await?,
(job::Status::Queued, job::Status::Cancelled) => self.cancel(conn, pipe).await?,
(job::Status::Cancelled, job::Status::Queued) => {
self.requeue(conn, pipe, false).await?
}
(job::Status::Failed, job::Status::Queued) => self.requeue(conn, pipe, false).await?,
(job::Status::TimedOut, job::Status::Queued) => self.requeue(conn, pipe, false).await?,
(from, to) => {
return Err(OcyError::conflict(format!("Cannot change status from {} to {}", from, to)))
}
})
}
pub async fn update_heartbeat<C: ConnectionLike + Send>(&self, conn: &mut C) -> OcyResult<()> {
let _: () = transaction_async!(conn, &[&self.key], {
if self.status(conn).await? != job::Status::Running {
return Err(OcyError::conflict(format!("Cannot heartbeat job {}, job is not running", self.id)));
}
redis::pipe()
.atomic()
.hset(&self.key, job::Field::LastHeartbeat, DateTime::now())
.ignore()
.query_async(conn)
.await?
});
debug!("[{}] updated heartbeat", &self.key);
Ok(())
}
pub async fn apply_expiry<C: ConnectionLike + Send>(&self, conn: &mut C) -> OcyResult<bool> {
let expired: bool = transaction_async!(conn, &[&self.key], {
if job::ExpiryMeta::from_conn(conn, &self.key)
.await?
.should_expire()
{
let result: Option<()> = self
.delete_in_pipe(conn, redis::pipe().atomic())
.await?
.query_async(conn)
.await?;
result.map(|_| true)
} else {
Some(false)
}
});
if expired {
info!("[{}] expired, removed from DB", &self.key);
}
Ok(expired)
}
pub async fn apply_timeouts<C: ConnectionLike + Send>(&self, conn: &mut C) -> OcyResult<bool> {
let timed_out: bool = transaction_async!(conn, &[&self.key], {
if job::TimeoutMeta::from_conn(conn, &self.key)
.await?
.has_timed_out()
{
let result: Option<()> = self
.fail(redis::pipe().atomic(), &job::Status::TimedOut)
.query_async(conn)
.await?;
result.map(|_| true)
} else {
Some(false)
}
});
if timed_out {
info!("[{}] timed out", self.key);
}
Ok(timed_out)
}
pub async fn apply_retries<C: ConnectionLike + Send>(&self, conn: &mut C) -> OcyResult<bool> {
let queue = self.queue(conn).await?;
let result: bool = transaction_async!(conn, &[&self.key, &queue.key], {
match job::RetryMeta::from_conn(conn, &self.key)
.await?
.retry_action()
{
job::RetryAction::Retry => {
if !queue.exists(conn).await? {
return Ok(false);
}
let result: Option<()> = self
.requeue(conn, redis::pipe().atomic(), true)
.await?
.query_async(conn)
.await?;
result.map(|_| true)
}
_ => Some(false),
}
});
Ok(result)
}
pub async fn delete<C: ConnectionLike + Send>(&self, conn: &mut C) -> OcyResult<bool> {
let result: bool = transaction_async!(conn, &[&self.key], {
let mut pipe = redis::pipe();
let pipe_ref = pipe.atomic();
match self.delete_in_pipe(conn, pipe_ref).await {
Ok(p) => {
let (num_deleted,): (Option<u8>,) = p.query_async(conn).await?;
num_deleted.map(|n| n > 0)
}
Err(OcyError::NoSuchJob(_)) => Some(false), Err(err) => return Err(err),
}
});
Ok(result)
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_lifetimes))]
pub async fn delete_in_pipe<'b, C: ConnectionLike + Send>(
&self,
conn: &mut C,
pipe: &'b mut Pipeline,
) -> OcyResult<&'b mut Pipeline> {
let (queue, tags): (Option<String>, Option<String>) = conn
.hget(&self.key, &[job::Field::Queue, job::Field::Tags])
.await?;
if let Some(queue) = queue {
pipe.lrem(RedisQueue::new(self.redis_manager, &queue)?.jobs_key, 1, self.id)
.ignore();
} else {
return Err(OcyError::NoSuchJob(self.id));
}
pipe.del(&self.key) .lrem(&self.redis_manager.failed_key, 1, self.id)
.ignore()
.lrem(&self.redis_manager.running_key, 1, self.id)
.ignore()
.lrem(&self.redis_manager.ended_key, 1, self.id)
.ignore();
if let Some(tags) = tags {
for tag in serde_json::from_str::<Vec<&str>>(&tags).unwrap() {
pipe.srem(self.redis_manager.build_tag_key(tag)?, self.id).ignore(); }
}
Ok(pipe)
}
}