use crate::{client::Client, proto, Error, Result};
use tracing::{debug, trace};
#[derive(Clone, Debug)]
pub struct Job(proto::ActivatedJob);
impl Job {
pub(crate) fn new(proto: proto::ActivatedJob) -> Self {
Job(proto)
}
pub fn key(&self) -> i64 {
self.0.key
}
pub fn job_type(&self) -> &str {
&self.0.r#type
}
pub fn process_instance_key(&self) -> i64 {
self.0.process_instance_key
}
pub fn bpmn_process_id(&self) -> &str {
&self.0.bpmn_process_id
}
pub fn process_definition_version(&self) -> i32 {
self.0.process_definition_version
}
pub fn process_definition_key(&self) -> i64 {
self.0.process_definition_key
}
pub fn element_id(&self) -> &str {
&self.0.element_id
}
pub fn element_instance_key(&self) -> i64 {
self.0.element_instance_key
}
pub fn custom_headers(&self) -> &str {
&self.0.custom_headers
}
pub fn worker(&self) -> &str {
&self.0.worker
}
pub fn retries(&self) -> i32 {
self.0.retries
}
pub fn deadline(&self) -> i64 {
self.0.deadline
}
pub fn variables_str(&self) -> &str {
&self.0.variables
}
pub fn variables(&self) -> serde_json::Value {
serde_json::from_str(&self.0.variables).unwrap_or_else(|_| serde_json::json!({}))
}
pub fn variables_as<'a, T: serde::de::Deserialize<'a>>(&'a self) -> Option<T> {
serde_json::from_str::<'a, T>(&self.0.variables).ok()
}
}
#[derive(Debug)]
pub struct CompleteJobBuilder {
client: Client,
job_key: Option<i64>,
variables: Option<serde_json::Value>,
}
impl CompleteJobBuilder {
pub fn new(client: Client) -> Self {
CompleteJobBuilder {
client,
job_key: None,
variables: None,
}
}
pub fn with_job_key(self, job_key: i64) -> Self {
CompleteJobBuilder {
job_key: Some(job_key),
..self
}
}
pub fn with_variables<T: Into<serde_json::Value>>(self, variables: T) -> Self {
CompleteJobBuilder {
variables: Some(variables.into()),
..self
}
}
#[tracing::instrument(skip(self), name = "complete_job", err)]
pub async fn send(mut self) -> Result<CompleteJobResponse> {
if self.job_key.is_none() && self.client.current_job_key.is_none() {
return Err(Error::InvalidParameters("`job_key` must be set"));
}
let req = proto::CompleteJobRequest {
job_key: self.job_key.or(self.client.current_job_key).unwrap(),
variables: self
.variables
.map_or(String::new(), |vars| vars.to_string()),
};
debug!(job_key = req.job_key, "completing job:");
trace!(?req, "request:");
let res = self
.client
.gateway_client
.complete_job(tonic::Request::new(req))
.await?;
Ok(CompleteJobResponse(res.into_inner()))
}
}
#[derive(Debug)]
pub struct CompleteJobResponse(proto::CompleteJobResponse);
#[derive(Debug)]
pub struct FailJobBuilder {
client: Client,
job_key: Option<i64>,
retries: Option<u32>,
error_message: Option<String>,
}
impl FailJobBuilder {
pub fn new(client: Client) -> Self {
FailJobBuilder {
client,
job_key: None,
retries: None,
error_message: None,
}
}
pub fn with_job_key(self, job_key: i64) -> Self {
FailJobBuilder {
job_key: Some(job_key),
..self
}
}
pub fn with_retries(self, retries: u32) -> Self {
FailJobBuilder {
retries: Some(retries),
..self
}
}
pub fn with_error_message<T: Into<String>>(self, error_message: T) -> Self {
FailJobBuilder {
error_message: Some(error_message.into()),
..self
}
}
#[tracing::instrument(skip(self), name = "fail_job", err)]
pub async fn send(mut self) -> Result<FailJobResponse> {
if self.job_key.is_none() && self.client.current_job_key.is_none() {
return Err(Error::InvalidParameters("`job_key` must be set"));
}
let req = proto::FailJobRequest {
job_key: self.job_key.or(self.client.current_job_key).unwrap(),
retries: self.retries.unwrap_or_default() as i32,
error_message: self.error_message.unwrap_or_default(),
};
debug!(job_key = req.job_key, "failing job:");
trace!(?req, "request:");
let res = self
.client
.gateway_client
.fail_job(tonic::Request::new(req))
.await?;
Ok(FailJobResponse(res.into_inner()))
}
}
#[derive(Debug)]
pub struct FailJobResponse(proto::FailJobResponse);
#[derive(Debug)]
pub struct ThrowErrorBuilder {
client: Client,
job_key: Option<i64>,
error_code: Option<String>,
error_message: Option<String>,
}
impl ThrowErrorBuilder {
pub fn new(client: Client) -> Self {
ThrowErrorBuilder {
client,
job_key: None,
error_code: None,
error_message: None,
}
}
pub fn with_job_key(self, job_key: i64) -> Self {
ThrowErrorBuilder {
job_key: Some(job_key),
..self
}
}
pub fn with_error_code<T: Into<String>>(self, error_code: T) -> Self {
ThrowErrorBuilder {
error_code: Some(error_code.into()),
..self
}
}
pub fn with_error_message<T: Into<String>>(self, error_message: T) -> Self {
ThrowErrorBuilder {
error_message: Some(error_message.into()),
..self
}
}
#[tracing::instrument(skip(self), name = "throw_error", err)]
pub async fn send(mut self) -> Result<ThrowErrorResponse> {
if self.job_key.is_none() && self.client.current_job_key.is_none() {
return Err(Error::InvalidParameters("`job_key` must be set"));
}
let req = proto::ThrowErrorRequest {
job_key: self.job_key.or(self.client.current_job_key).unwrap(),
error_code: self.error_code.unwrap_or_default(),
error_message: self.error_message.unwrap_or_default(),
};
debug!(?req, "sending request:");
let res = self
.client
.gateway_client
.throw_error(tonic::Request::new(req))
.await?;
Ok(ThrowErrorResponse(res.into_inner()))
}
}
#[derive(Debug)]
pub struct ThrowErrorResponse(proto::ThrowErrorResponse);
#[derive(Debug)]
pub struct UpdateJobRetriesBuilder {
client: Client,
job_key: Option<i64>,
retries: Option<u32>,
}
impl UpdateJobRetriesBuilder {
pub fn new(client: Client) -> Self {
UpdateJobRetriesBuilder {
client,
job_key: None,
retries: None,
}
}
pub fn with_job_key(self, job_key: i64) -> Self {
UpdateJobRetriesBuilder {
job_key: Some(job_key),
..self
}
}
pub fn with_retries(self, retries: u32) -> Self {
UpdateJobRetriesBuilder {
retries: Some(retries),
..self
}
}
#[tracing::instrument(skip(self), name = "update_job_retries", err)]
pub async fn send(mut self) -> Result<UpdateJobRetriesResponse> {
if (self.job_key.is_none() && self.client.current_job_key.is_none())
|| self.retries.is_none()
{
return Err(Error::InvalidParameters(
"`job_key` and `retries` must be set",
));
}
let req = proto::UpdateJobRetriesRequest {
job_key: self.job_key.or(self.client.current_job_key).unwrap(),
retries: self.retries.unwrap() as i32,
};
debug!(?req, "sending request:");
let res = self
.client
.gateway_client
.update_job_retries(tonic::Request::new(req))
.await?;
Ok(UpdateJobRetriesResponse(res.into_inner()))
}
}
#[derive(Debug)]
pub struct UpdateJobRetriesResponse(proto::UpdateJobRetriesResponse);