use chrono::{DateTime, Utc};
use derive_builder::Builder;
use std::collections::HashMap;
use std::io::prelude::*;
mod cmd;
mod resp;
mod utils;
#[cfg(feature = "ent")]
mod ent;
use crate::error::Error;
pub use self::cmd::*;
pub use self::resp::*;
const JOB_DEFAULT_QUEUE: &str = "default";
const JOB_DEFAULT_RESERVED_FOR_SECS: usize = 600;
const JOB_DEFAULT_RETRY_COUNT: isize = 25;
const JOB_DEFAULT_PRIORITY: u8 = 5;
const JOB_DEFAULT_BACKTRACE: usize = 0;
#[derive(Serialize, Deserialize, Debug, Builder)]
#[builder(
custom_constructor,
setter(into),
build_fn(name = "try_build", private)
)]
pub struct Job {
#[builder(default = "utils::gen_random_jid()")]
pub(crate) jid: String,
#[builder(default = "JOB_DEFAULT_QUEUE.into()")]
pub queue: String,
#[serde(rename = "jobtype")]
#[builder(setter(custom))]
pub(crate) kind: String,
#[builder(setter(custom), default = "Vec::new()")]
pub(crate) args: Vec<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "Some(Utc::now())")]
pub created_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(setter(skip))]
pub enqueued_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "None")]
pub at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "Some(JOB_DEFAULT_RESERVED_FOR_SECS)")]
pub reserve_for: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "Some(JOB_DEFAULT_RETRY_COUNT)")]
pub retry: Option<isize>,
#[builder(default = "Some(JOB_DEFAULT_PRIORITY)")]
pub priority: Option<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
#[builder(default = "Some(JOB_DEFAULT_BACKTRACE)")]
pub backtrace: Option<usize>,
#[serde(skip_serializing)]
#[builder(setter(skip))]
failure: Option<Failure>,
#[serde(skip_serializing_if = "HashMap::is_empty")]
#[serde(default = "HashMap::default")]
#[builder(default = "HashMap::default()")]
pub custom: HashMap<String, serde_json::Value>,
}
impl JobBuilder {
pub fn new(kind: impl Into<String>) -> JobBuilder {
JobBuilder {
kind: Some(kind.into()),
..JobBuilder::create_empty()
}
}
pub fn args<A>(&mut self, args: Vec<A>) -> &mut Self
where
A: Into<serde_json::Value>,
{
self.args = Some(args.into_iter().map(|s| s.into()).collect());
self
}
pub fn add_to_custom_data(
&mut self,
k: impl Into<String>,
v: impl Into<serde_json::Value>,
) -> &mut Self {
let custom = self.custom.get_or_insert_with(HashMap::new);
custom.insert(k.into(), v.into());
self
}
pub fn build(&self) -> Job {
self.try_build()
.expect("All required fields have been set.")
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Failure {
retry_count: usize,
failed_at: String,
#[serde(skip_serializing_if = "Option::is_none")]
next_at: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
message: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "errtype")]
kind: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
backtrace: Option<Vec<String>>,
}
impl Job {
pub fn new<S, A>(kind: S, args: Vec<A>) -> Self
where
S: Into<String>,
A: Into<serde_json::Value>,
{
JobBuilder::new(kind).args(args).build()
}
pub fn builder<S: Into<String>>(kind: S) -> JobBuilder {
JobBuilder::new(kind)
}
pub fn on_queue<S: Into<String>>(mut self, queue: S) -> Self {
self.queue = queue.into();
self
}
pub fn id(&self) -> &str {
&self.jid
}
pub fn kind(&self) -> &str {
&self.kind
}
pub fn args(&self) -> &[serde_json::Value] {
&self.args
}
pub fn failure(&self) -> &Option<Failure> {
&self.failure
}
}
pub fn write_command<W: Write, C: FaktoryCommand>(w: &mut W, command: &C) -> Result<(), Error> {
command.issue::<W>(w)?;
Ok(w.flush()?)
}
pub fn write_command_and_await_ok<X: BufRead + Write, C: FaktoryCommand>(
x: &mut X,
command: &C,
) -> Result<(), Error> {
write_command(x, command)?;
read_ok(x)
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_job_can_be_created_with_builder() {
let job_kind = "order";
let job_args = vec!["ISBN-13:9781718501850"];
let job = JobBuilder::new(job_kind).args(job_args.clone()).build();
assert!(job.jid != "".to_owned());
assert!(job.queue == JOB_DEFAULT_QUEUE.to_string());
assert_eq!(job.kind, job_kind);
assert_eq!(job.args, job_args);
assert!(job.created_at.is_some());
assert!(job.created_at < Some(Utc::now()));
assert!(job.enqueued_at.is_none());
assert!(job.at.is_none());
assert_eq!(job.reserve_for, Some(JOB_DEFAULT_RESERVED_FOR_SECS));
assert_eq!(job.retry, Some(JOB_DEFAULT_RETRY_COUNT));
assert_eq!(job.priority, Some(JOB_DEFAULT_PRIORITY));
assert_eq!(job.backtrace, Some(JOB_DEFAULT_BACKTRACE));
assert!(job.failure.is_none());
assert_eq!(job.custom, HashMap::default());
let job = JobBuilder::new(job_kind).build();
assert!(job.args.is_empty());
}
#[test]
fn test_all_job_creation_variants_align() {
let job1 = Job::new("order", vec!["ISBN-13:9781718501850"]);
let job2 = JobBuilder::new("order")
.args(vec!["ISBN-13:9781718501850"])
.build();
assert_eq!(job1.kind, job2.kind);
assert_eq!(job1.args, job2.args);
assert_eq!(job1.queue, job2.queue);
assert_eq!(job1.enqueued_at, job2.enqueued_at);
assert_eq!(job1.at, job2.at);
assert_eq!(job1.reserve_for, job2.reserve_for);
assert_eq!(job1.retry, job2.retry);
assert_eq!(job1.priority, job2.priority);
assert_eq!(job1.backtrace, job2.backtrace);
assert_eq!(job1.custom, job2.custom);
assert_ne!(job1.jid, job2.jid);
assert_ne!(job1.created_at, job2.created_at);
let job3 = Job::builder("order")
.args(vec!["ISBN-13:9781718501850"])
.build();
assert_eq!(job2.kind, job3.kind);
assert_eq!(job1.args, job2.args);
assert_ne!(job2.jid, job3.jid);
assert_ne!(job2.created_at, job3.created_at);
}
#[test]
fn test_arbitrary_custom_data_setter() {
let job = JobBuilder::new("order")
.args(vec!["ISBN-13:9781718501850"])
.add_to_custom_data("arbitrary_key", "arbitrary_value")
.build();
assert_eq!(
job.custom.get("arbitrary_key").unwrap(),
&serde_json::Value::from("arbitrary_value")
);
}
}