use crate::gateway;
use crate::gateway_grpc::*;
use futures::compat::{Future01CompatExt, Stream01CompatExt};
use futures::future::{Future, TryFutureExt};
use futures::stream::{Stream, TryStreamExt};
use grpc::ClientStubExt;
use std::sync::Arc;
use crate::gateway::TopologyResponse;
use serde::Serialize;
#[derive(Debug, Fail)]
pub enum Error {
#[fail(display = "Gateway Error. {:?}", _0)]
GatewayError(grpc::Error),
#[fail(display = "Topology Error. {:?}", _0)]
TopologyError(grpc::Error),
#[fail(display = "List Workflows Error. {:?}", _0)]
ListWorkflowsError(grpc::Error),
#[fail(display = "Deploy Workflow Error. {:?}", _0)]
DeployWorkflowError(grpc::Error),
#[fail(display = "Create Workflow Instance Error. {:?}", _0)]
CreateWorkflowInstanceError(grpc::Error),
#[fail(display = "Activate Job Error. {:?}", _0)]
ActivateJobError(grpc::Error),
#[fail(display = "Complete Job Error. {:?}", _0)]
CompleteJobError(grpc::Error),
#[fail(display = "Publish Message Error. {:?}", _0)]
PublishMessageError(grpc::Error),
#[fail(display = "Fail Job Error. {:?}", _0)]
FailJobError(grpc::Error),
#[cfg(feature = "timer")]
#[fail(display = "Interval Error. {:?}", _0)]
IntervalError(tokio::timer::Error),
#[fail(display = "Job Error: {}", _0)]
JobError(String),
#[fail(display = "Json Payload Serialization Error. {:?}", _0)]
JsonError(serde_json::error::Error),
}
pub enum WorkflowVersion {
Latest,
Version(i32),
}
impl Into<i32> for WorkflowVersion {
fn into(self) -> i32 {
match self {
WorkflowVersion::Latest => -1,
WorkflowVersion::Version(v) => v,
}
}
}
#[derive(Clone)]
pub struct Client {
pub gateway_client: Arc<dyn Gateway + Send + Sync>,
}
impl Client {
pub fn new(host: &str, port: u16) -> Result<Self, Error> {
GatewayClient::new_plain(host, port, Default::default())
.map_err(|e| Error::GatewayError(e))
.map(Arc::new)
.map(|gateway_client| Client { gateway_client })
}
pub fn topology(&self) -> impl Future<Output = Result<Topology, Error>> {
self.gateway_client
.topology(Default::default(), Default::default())
.drop_metadata()
.compat()
.map_ok(|tr| Topology::new(tr))
.map_err(|e| Error::TopologyError(e))
}
pub fn deploy_bpmn_workflow<S: Into<String>>(
&self,
workflow_name: S,
workflow_definition: Vec<u8>,
) -> impl Future<Output = Result<DeployedWorkflows, Error>> {
let mut workflow_request_object = gateway::WorkflowRequestObject::default();
workflow_request_object.set_name(workflow_name.into());
workflow_request_object.set_definition(workflow_definition);
workflow_request_object.set_field_type(gateway::WorkflowRequestObject_ResourceType::BPMN);
let mut deploy_workflow_request = gateway::DeployWorkflowRequest::default();
deploy_workflow_request
.set_workflows(protobuf::RepeatedField::from(vec![workflow_request_object]));
self.gateway_client
.deploy_workflow(Default::default(), deploy_workflow_request)
.drop_metadata()
.compat()
.map_err(|e| Error::DeployWorkflowError(e))
.map_ok(|dwr| DeployedWorkflows::new(dwr))
}
pub fn create_workflow_instance(
&self,
workflow_instance: WorkflowInstance,
) -> impl Future<Output = Result<CreatedWorkflowInstance, Error>> {
self.gateway_client
.create_workflow_instance(Default::default(), workflow_instance.into())
.drop_metadata()
.compat()
.map_err(|e| Error::CreateWorkflowInstanceError(e))
.map_ok(|cwr| CreatedWorkflowInstance::new(cwr))
}
pub fn activate_jobs(
&self,
jobs_config: ActivateJobs,
) -> impl Stream<Item = Result<ActivatedJobs, Error>> + Send {
self.gateway_client
.activate_jobs(Default::default(), jobs_config.into())
.drop_metadata()
.compat()
.map_err(|e| Error::ActivateJobError(e))
.map_ok(|ajr| ActivatedJobs::new(ajr))
}
pub fn complete_job(
&self,
complete_job: CompleteJob,
) -> impl Future<Output = Result<(), Error>> + Send {
self.gateway_client
.complete_job(Default::default(), complete_job.into())
.drop_metadata()
.compat()
.map_err(|e| Error::CompleteJobError(e))
.map_ok(|_| ())
}
pub fn fail_job(
&self,
job_key: i64,
retries: i32,
error_message: String,
) -> impl Future<Output = Result<(), Error>> + Send {
let request_options = Default::default();
let mut request = gateway::FailJobRequest::default();
request.set_jobKey(job_key);
request.set_retries(retries);
request.set_errorMessage(error_message);
self.gateway_client
.fail_job(request_options, request)
.drop_metadata()
.compat()
.map_ok(|_| ())
.map_err(|e| Error::FailJobError(e))
}
pub fn publish_message(
&self,
publish_message: PublishMessage,
) -> impl Future<Output = Result<(), Error>> {
self.gateway_client
.publish_message(Default::default(), publish_message.into())
.drop_metadata()
.compat()
.map_err(|e| Error::PublishMessageError(e))
.map_ok(|_| ())
}
}
#[derive(Debug)]
pub struct Topology {
pub brokers: Vec<BrokerInfo>,
}
impl Topology {
pub fn new(topology_response: TopologyResponse) -> Topology {
Self {
brokers: topology_response
.brokers
.into_iter()
.map(From::from)
.collect(),
}
}
}
impl From<gateway::TopologyResponse> for Topology {
fn from(tr: gateway::TopologyResponse) -> Self {
Self {
brokers: tr.brokers.into_iter().map(From::from).collect(),
}
}
}
#[derive(Debug)]
pub struct BrokerInfo {
pub node_id: i32,
pub host: String,
pub port: i32,
pub partitions: Vec<Partition>,
}
impl From<gateway::BrokerInfo> for BrokerInfo {
fn from(bi: gateway::BrokerInfo) -> Self {
Self {
node_id: bi.nodeId,
host: bi.host,
port: bi.port,
partitions: bi.partitions.into_iter().map(From::from).collect(),
}
}
}
#[derive(Debug)]
pub struct Partition {
pub partition_id: i32,
pub role: BrokerRole,
}
impl From<gateway::Partition> for Partition {
fn from(p: gateway::Partition) -> Self {
Self {
partition_id: p.partitionId,
role: p.role.into(),
}
}
}
#[derive(Debug)]
pub enum BrokerRole {
LEADER = 0,
FOLLOWER = 1,
}
impl From<gateway::Partition_PartitionBrokerRole> for BrokerRole {
fn from(pbr: gateway::Partition_PartitionBrokerRole) -> Self {
match pbr {
gateway::Partition_PartitionBrokerRole::FOLLOWER => BrokerRole::FOLLOWER,
gateway::Partition_PartitionBrokerRole::LEADER => BrokerRole::LEADER,
}
}
}
#[derive(Debug)]
pub struct DeployedWorkflows {
pub key: i64,
pub workflows: Vec<Workflow>,
}
impl DeployedWorkflows {
pub fn new(deploy_workflow_response: gateway::DeployWorkflowResponse) -> DeployedWorkflows {
Self {
key: deploy_workflow_response.key,
workflows: deploy_workflow_response
.workflows
.into_iter()
.map(From::from)
.collect(),
}
}
}
#[derive(Debug)]
pub struct Workflow {
pub bpmn_process_id: String,
pub version: i32,
pub workflow_key: i64,
pub resource_name: String,
}
impl From<gateway::WorkflowMetadata> for Workflow {
fn from(wm: gateway::WorkflowMetadata) -> Self {
Self {
bpmn_process_id: wm.bpmnProcessId,
version: wm.version,
workflow_key: wm.workflowKey,
resource_name: wm.resourceName,
}
}
}
#[derive(Debug)]
pub struct CreatedWorkflowInstance {
workflow_key: i64,
bpmn_process_id: String,
version: i32,
workflow_instance_key: i64,
}
impl CreatedWorkflowInstance {
pub fn new(cwir: gateway::CreateWorkflowInstanceResponse) -> Self {
Self {
workflow_key: cwir.workflowKey,
bpmn_process_id: cwir.bpmnProcessId,
version: cwir.version,
workflow_instance_key: cwir.workflowInstanceKey,
}
}
}
enum WorkflowId {
BpmnProcessId(String, WorkflowVersion),
WorkflowKey(i64),
}
pub struct WorkflowInstance {
id: WorkflowId,
variables: Option<String>,
}
impl WorkflowInstance {
pub fn workflow_instance_with_bpmn_process<S: Into<String>>(
bpmn_process_id: S,
version: WorkflowVersion,
) -> Self {
WorkflowInstance {
id: WorkflowId::BpmnProcessId(bpmn_process_id.into(), version),
variables: None,
}
}
pub fn workflow_instance_with_workflow_key(workflow_key: i64) -> Self {
WorkflowInstance {
id: WorkflowId::WorkflowKey(workflow_key),
variables: None,
}
}
pub fn variables<S: Serialize>(mut self, variables: &S) -> Result<Self, serde_json::Error> {
serde_json::to_string(variables).map(move |v| {
self.variables = Some(v);
self
})
}
}
impl Into<gateway::CreateWorkflowInstanceRequest> for WorkflowInstance {
fn into(self) -> gateway::CreateWorkflowInstanceRequest {
let mut request = gateway::CreateWorkflowInstanceRequest::default();
match self.id {
WorkflowId::BpmnProcessId(bpmn_process_id, version) => {
request.set_version(version.into());
request.set_bpmnProcessId(bpmn_process_id);
}
WorkflowId::WorkflowKey(key) => {
request.set_workflowKey(key);
}
}
if let Some(variables) = self.variables {
request.set_variables(variables);
}
request
}
}
pub struct PublishMessage {
name: String,
correlation_key: String,
time_to_live: i64,
message_id: String,
variables: Option<String>,
}
impl PublishMessage {
pub fn new<S1: Into<String>, S2: Into<String>, S3: Into<String>>(
name: S1,
correlation_key: S2,
time_to_live: i64,
message_id: S3,
) -> Self {
PublishMessage {
name: name.into(),
correlation_key: correlation_key.into(),
time_to_live,
message_id: message_id.into(),
variables: None,
}
}
pub fn variables<S: Serialize>(mut self, variables: &S) -> Result<Self, Error> {
serde_json::to_string(variables)
.map_err(|e| Error::JsonError(e))
.map(move |v| {
self.variables = Some(v);
self
})
}
}
impl Into<gateway::PublishMessageRequest> for PublishMessage {
fn into(self) -> gateway::PublishMessageRequest {
let mut publish_message_request = gateway::PublishMessageRequest::default();
if let Some(variables) = self.variables {
publish_message_request.set_variables(variables);
}
publish_message_request.set_name(self.name);
publish_message_request.set_timeToLive(self.time_to_live);
publish_message_request.set_messageId(self.message_id);
publish_message_request.set_correlationKey(self.correlation_key);
publish_message_request
}
}
#[derive(Debug)]
pub struct CompleteJob {
pub job_key: i64,
pub variables: Option<String>,
}
impl CompleteJob {
pub fn new(job_key: i64, variables: Option<String>) -> Self {
Self { job_key, variables }
}
pub fn variables<S: Serialize>(mut self, variables: &S) -> Result<Self, Error> {
serde_json::to_string(variables)
.map_err(|e| Error::JsonError(e))
.map(move |v| {
self.variables = Some(v);
self
})
}
}
impl Into<gateway::CompleteJobRequest> for CompleteJob {
fn into(self) -> gateway::CompleteJobRequest {
let mut complete_job_request = gateway::CompleteJobRequest::default();
complete_job_request.set_jobKey(self.job_key);
if let Some(variables) = self.variables {
complete_job_request.set_variables(variables);
}
complete_job_request
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ActivateJobs {
pub worker: String,
pub job_type: String,
pub timeout: i64,
pub max_jobs_to_activate: i32,
}
impl ActivateJobs {
pub fn new<S1: Into<String>, S2: Into<String>>(
worker: S1,
job_type: S2,
timeout: i64,
max_jobs_to_activate: i32,
) -> Self {
ActivateJobs {
worker: worker.into(),
job_type: job_type.into(),
timeout,
max_jobs_to_activate,
}
}
}
impl Into<gateway::ActivateJobsRequest> for ActivateJobs {
fn into(self) -> gateway::ActivateJobsRequest {
let mut activate_jobs_request = gateway::ActivateJobsRequest::default();
activate_jobs_request.set_maxJobsToActivate(self.max_jobs_to_activate);
activate_jobs_request.set_timeout(self.timeout);
activate_jobs_request.set_worker(self.worker);
activate_jobs_request.set_field_type(self.job_type);
activate_jobs_request
}
}
#[derive(Debug)]
pub struct ActivatedJobs {
pub activated_jobs: Vec<ActivatedJob>,
}
impl ActivatedJobs {
pub fn new(ajr: gateway::ActivateJobsResponse) -> Self {
let activated_jobs: Vec<ActivatedJob> = ajr.jobs.into_iter().map(From::from).collect();
ActivatedJobs { activated_jobs }
}
}
#[derive(Clone, Debug)]
pub struct ActivatedJob {
pub key: i64,
pub field_type: String,
pub custom_headers: String,
pub worker: String,
pub retries: i32,
pub deadline: i64,
pub variables: String,
}
impl From<gateway::ActivatedJob> for ActivatedJob {
fn from(aj: gateway::ActivatedJob) -> Self {
ActivatedJob {
key: aj.key,
variables: aj.variables,
worker: aj.worker,
retries: aj.retries,
deadline: aj.deadline,
custom_headers: aj.customHeaders,
field_type: aj.field_type,
}
}
}