use crate::{proto, Client, Error, Result};
use tokio::{fs::File, io::AsyncReadExt};
use tracing::{debug, trace};
#[derive(Debug)]
pub struct DeployWorkflowBuilder {
client: Client,
resource_files: Vec<String>,
resource_type: WorkflowResourceType,
}
#[derive(Clone, Debug)]
pub enum WorkflowResourceType {
File = 0,
Bpmn = 1,
#[deprecated]
Yaml = 2,
}
impl DeployWorkflowBuilder {
pub fn new(client: Client) -> Self {
DeployWorkflowBuilder {
client,
resource_files: Vec::new(),
resource_type: WorkflowResourceType::File,
}
}
pub fn with_resource_file<T: Into<String>>(self, resource_file: T) -> Self {
DeployWorkflowBuilder {
resource_files: vec![resource_file.into()],
..self
}
}
pub fn with_resource_files(self, resource_files: Vec<String>) -> Self {
DeployWorkflowBuilder {
resource_files,
..self
}
}
#[deprecated(
note = "As of Zeebe 1.0, YAML support was removed and BPMN is the only supported resource type."
)]
pub fn with_resource_type(self, resource_type: WorkflowResourceType) -> Self {
DeployWorkflowBuilder {
resource_type,
..self
}
}
#[tracing::instrument(skip(self), name = "deploy_workflow")]
pub async fn send(mut self) -> Result<DeployWorkflowResponse> {
trace!(files = ?self.resource_files, resource_type = ?self.resource_type, "reading files");
let mut workflows = Vec::with_capacity(self.resource_files.len());
for path in self.resource_files.iter() {
let mut file = File::open(path).await.map_err(|e| Error::FileIo {
resource_file: path.clone(),
source: e,
})?;
let mut definition = vec![];
file.read_to_end(&mut definition)
.await
.map_err(|e| Error::FileIo {
resource_file: path.clone(),
source: e,
})?;
#[allow(deprecated)]
workflows.push(proto::WorkflowRequestObject {
name: path.clone(),
r#type: self.resource_type.clone() as i32,
definition,
})
}
debug!(files = ?self.resource_files, resource_type = ?self.resource_type, "sending request");
let res = self
.client
.gateway_client
.deploy_workflow(tonic::Request::new(proto::DeployWorkflowRequest {
workflows,
}))
.await?;
Ok(DeployWorkflowResponse(res.into_inner()))
}
}
#[derive(Debug)]
pub struct DeployWorkflowResponse(proto::DeployWorkflowResponse);
impl DeployWorkflowResponse {
pub fn key(&self) -> i64 {
self.0.key
}
pub fn workflows(&self) -> Vec<WorkflowMetadata> {
self.0
.workflows
.iter()
.map(|proto| WorkflowMetadata(proto.clone()))
.collect()
}
}
#[derive(Debug)]
pub struct WorkflowMetadata(proto::WorkflowMetadata);
impl WorkflowMetadata {
pub fn bpmn_process_id(&self) -> &str {
&self.0.bpmn_process_id
}
pub fn version(&self) -> i32 {
self.0.version
}
pub fn workflow_key(&self) -> i64 {
self.0.workflow_key
}
pub fn resource_name(&self) -> &str {
&self.0.resource_name
}
}
#[derive(Debug)]
pub struct CreateWorkflowInstanceBuilder {
client: Client,
workflow_key: Option<i64>,
bpmn_process_id: Option<String>,
version: i32,
variables: Option<serde_json::Value>,
}
impl CreateWorkflowInstanceBuilder {
pub fn new(client: Client) -> Self {
CreateWorkflowInstanceBuilder {
client,
workflow_key: None,
bpmn_process_id: None,
version: -1,
variables: None,
}
}
pub fn with_workflow_key(self, workflow_key: i64) -> Self {
CreateWorkflowInstanceBuilder {
workflow_key: Some(workflow_key),
..self
}
}
pub fn with_bpmn_process_id<T: Into<String>>(self, bpmn_process_id: T) -> Self {
CreateWorkflowInstanceBuilder {
bpmn_process_id: Some(bpmn_process_id.into()),
..self
}
}
pub fn with_version(self, version: i32) -> Self {
CreateWorkflowInstanceBuilder { version, ..self }
}
pub fn with_latest_version(self) -> Self {
CreateWorkflowInstanceBuilder {
version: -1,
..self
}
}
pub fn with_variables<T: Into<serde_json::Value>>(self, variables: T) -> Self {
CreateWorkflowInstanceBuilder {
variables: Some(variables.into()),
..self
}
}
#[tracing::instrument(skip(self), name = "create_workflow_instance")]
pub async fn send(mut self) -> Result<CreateWorkflowInstanceResponse> {
if self.workflow_key.is_none() && self.bpmn_process_id.is_none() {
return Err(Error::InvalidParameters(
"`workflow_key` or `pbmn_process_id` must be set",
));
}
let req = proto::CreateWorkflowInstanceRequest {
workflow_key: self.workflow_key.unwrap_or(0),
bpmn_process_id: self.bpmn_process_id.unwrap_or_else(String::new),
version: self.version,
variables: self
.variables
.map_or(String::new(), |vars| vars.to_string()),
};
debug!(?req, "sending request:");
let res = self
.client
.gateway_client
.create_workflow_instance(tonic::Request::new(req))
.await?;
Ok(CreateWorkflowInstanceResponse(res.into_inner()))
}
}
#[derive(Debug)]
pub struct CreateWorkflowInstanceResponse(proto::CreateWorkflowInstanceResponse);
impl CreateWorkflowInstanceResponse {
pub fn workflow_key(&self) -> i64 {
self.0.workflow_key
}
pub fn bpmn_process_id(&self) -> &str {
&self.0.bpmn_process_id
}
pub fn version(&self) -> i32 {
self.0.version
}
pub fn workflow_instance_key(&self) -> i64 {
self.0.workflow_instance_key
}
}
#[derive(Debug)]
pub struct CreateWorkflowInstanceWithResultBuilder {
client: Client,
workflow_key: Option<i64>,
bpmn_process_id: Option<String>,
version: i32,
variables: Option<serde_json::Value>,
request_timeout: u64,
fetch_variables: Vec<String>,
}
impl CreateWorkflowInstanceWithResultBuilder {
pub fn new(client: Client) -> Self {
CreateWorkflowInstanceWithResultBuilder {
client,
workflow_key: None,
bpmn_process_id: None,
version: -1,
variables: None,
request_timeout: 0,
fetch_variables: Vec::new(),
}
}
pub fn with_workflow_key(self, workflow_key: i64) -> Self {
CreateWorkflowInstanceWithResultBuilder {
workflow_key: Some(workflow_key),
..self
}
}
pub fn with_bpmn_process_id<T: Into<String>>(self, bpmn_process_id: T) -> Self {
CreateWorkflowInstanceWithResultBuilder {
bpmn_process_id: Some(bpmn_process_id.into()),
..self
}
}
pub fn with_version(self, version: i32) -> Self {
CreateWorkflowInstanceWithResultBuilder { version, ..self }
}
pub fn with_latest_version(self) -> Self {
CreateWorkflowInstanceWithResultBuilder {
version: -1,
..self
}
}
pub fn with_variables<T: Into<serde_json::Value>>(self, variables: T) -> Self {
CreateWorkflowInstanceWithResultBuilder {
variables: Some(variables.into()),
..self
}
}
pub fn with_fetch_variables(self, fetch_variables: Vec<String>) -> Self {
CreateWorkflowInstanceWithResultBuilder {
fetch_variables,
..self
}
}
pub fn with_request_timeout(self, request_timeout: u64) -> Self {
CreateWorkflowInstanceWithResultBuilder {
request_timeout,
..self
}
}
#[tracing::instrument(skip(self), name = "create_workflow_instance_with_result")]
pub async fn send(mut self) -> Result<CreateWorkflowInstanceWithResultResponse> {
if self.workflow_key.is_none() && self.bpmn_process_id.is_none() {
return Err(Error::InvalidParameters(
"`workflow_key` or `pbmn_process_id` must be set",
));
}
let req = proto::CreateWorkflowInstanceWithResultRequest {
request: Some(proto::CreateWorkflowInstanceRequest {
workflow_key: self.workflow_key.unwrap_or(0),
bpmn_process_id: self.bpmn_process_id.unwrap_or_else(String::new),
version: self.version,
variables: self
.variables
.map_or(String::new(), |vars| vars.to_string()),
}),
request_timeout: self.request_timeout as i64,
fetch_variables: self.fetch_variables,
};
debug!(?req, "sending request:");
let res = self
.client
.gateway_client
.create_workflow_instance_with_result(tonic::Request::new(req))
.await?;
Ok(CreateWorkflowInstanceWithResultResponse(res.into_inner()))
}
}
#[derive(Debug)]
pub struct CreateWorkflowInstanceWithResultResponse(
proto::CreateWorkflowInstanceWithResultResponse,
);
impl CreateWorkflowInstanceWithResultResponse {
pub fn workflow_key(&self) -> i64 {
self.0.workflow_key
}
pub fn bpmn_process_id(&self) -> &str {
&self.0.bpmn_process_id
}
pub fn version(&self) -> i32 {
self.0.version
}
pub fn workflow_instance_key(&self) -> i64 {
self.0.workflow_instance_key
}
pub fn variables_str(&self) -> &str {
&self.0.variables
}
pub fn variables(&self) -> serde_json::Value {
serde_json::from_str(&self.0.variables).unwrap_or_else(|_| serde_json::json!({}))
}
pub fn variables_as<'a, T: serde::de::Deserialize<'a>>(&'a self) -> Option<T> {
serde_json::from_str::<'a, T>(&self.0.variables).ok()
}
}
#[derive(Debug)]
pub struct CancelWorkflowInstanceBuilder {
client: Client,
workflow_instance_key: Option<i64>,
}
impl CancelWorkflowInstanceBuilder {
pub fn new(client: Client) -> Self {
CancelWorkflowInstanceBuilder {
client,
workflow_instance_key: None,
}
}
pub fn with_workflow_instance_key(self, workflow_key: i64) -> Self {
CancelWorkflowInstanceBuilder {
workflow_instance_key: Some(workflow_key),
..self
}
}
#[tracing::instrument(skip(self), name = "cancel_workflow_instance")]
pub async fn send(mut self) -> Result<CancelWorkflowInstanceResponse> {
if self.workflow_instance_key.is_none() {
return Err(Error::InvalidParameters(
"`workflow_instance_key` must be set",
));
}
let req = proto::CancelWorkflowInstanceRequest {
workflow_instance_key: self.workflow_instance_key.unwrap(),
};
debug!(?req, "sending request:");
let res = self
.client
.gateway_client
.cancel_workflow_instance(tonic::Request::new(req))
.await?;
Ok(CancelWorkflowInstanceResponse(res.into_inner()))
}
}
#[derive(Debug)]
pub struct CancelWorkflowInstanceResponse(proto::CancelWorkflowInstanceResponse);
#[derive(Debug)]
pub struct SetVariablesBuilder {
client: Client,
element_instance_key: Option<i64>,
variables: Option<serde_json::Value>,
local: bool,
}
impl SetVariablesBuilder {
pub fn new(client: Client) -> Self {
SetVariablesBuilder {
client,
element_instance_key: None,
variables: None,
local: false,
}
}
pub fn with_element_instance_key(self, element_instance_key: i64) -> Self {
SetVariablesBuilder {
element_instance_key: Some(element_instance_key),
..self
}
}
pub fn with_variables<T: Into<serde_json::Value>>(self, variables: T) -> Self {
SetVariablesBuilder {
variables: Some(variables.into()),
..self
}
}
pub fn with_local(self, local: bool) -> Self {
SetVariablesBuilder { local, ..self }
}
#[tracing::instrument(skip(self), name = "set_variables")]
pub async fn send(mut self) -> Result<SetVariablesResponse> {
if self.element_instance_key.is_none() {
return Err(Error::InvalidParameters(
"`element_instance_key` must be set",
));
}
let req = proto::SetVariablesRequest {
element_instance_key: self.element_instance_key.unwrap(),
variables: self
.variables
.map_or(String::new(), |vars| vars.to_string()),
local: self.local,
};
debug!(?req, "sending request:");
let res = self
.client
.gateway_client
.set_variables(tonic::Request::new(req))
.await?;
Ok(SetVariablesResponse(res.into_inner()))
}
}
#[derive(Debug)]
pub struct SetVariablesResponse(proto::SetVariablesResponse);
impl SetVariablesResponse {
pub fn key(&self) -> i64 {
self.0.key
}
}