use crate::client::Client;
use crate::error::{Error, Result};
use crate::job::Job;
use crate::proto;
use crate::worker::{
auto_handler::{Extensions, FromJob, HandlerFactory, State},
job_dispatcher, JobPoller, PollMessage,
};
use futures::future::LocalBoxFuture;
use futures::FutureExt;
use serde::Serialize;
use serde_json::json;
use std::fmt;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::{sync::mpsc, time::interval};
use tokio_stream::{
wrappers::{IntervalStream, ReceiverStream},
StreamExt,
};
use tracing_futures::Instrument;
static DEFAULT_JOB_TIMEOUT: Duration = Duration::from_secs(5 * 60);
static DEFAULT_JOB_TIMEOUT_IN_MS: i64 = DEFAULT_JOB_TIMEOUT.as_millis() as i64;
static DEFAULT_JOB_WORKER_MAX_JOB_ACTIVE: u32 = 32;
static DEFAULT_JOB_WORKER_CONCURRENCY: u32 = 4;
static DEFAULT_JOB_WORKER_POLL_INTERVAL: Duration = Duration::from_millis(100);
static DEFAULT_JOB_WORKER_POLL_THRESHOLD: f32 = 0.3;
static REQUEST_TIMEOUT_OFFSET: Duration = Duration::from_secs(10);
static DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Clone)]
pub(crate) struct JobHandler(Arc<dyn Fn(Client, Job) -> LocalBoxFuture<'static, ()>>);
impl JobHandler {
pub(crate) fn call(&self, client: Client, job: Job) -> LocalBoxFuture<'static, ()> {
self.0(client, job)
}
}
impl fmt::Debug for JobHandler {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("JobHandler")
}
}
#[derive(Debug)]
pub struct JobWorkerBuilder {
client: Client,
handler: Option<JobHandler>,
data: Extensions,
concurrency: u32,
poll_interval: Duration,
poll_threshold: f32,
request: proto::ActivateJobsRequest,
request_timeout: Duration,
}
impl JobWorkerBuilder {
pub fn new(client: Client) -> Self {
JobWorkerBuilder {
client,
handler: None,
data: Extensions::new(),
concurrency: DEFAULT_JOB_WORKER_CONCURRENCY,
poll_interval: DEFAULT_JOB_WORKER_POLL_INTERVAL,
poll_threshold: DEFAULT_JOB_WORKER_POLL_THRESHOLD,
request: proto::ActivateJobsRequest {
r#type: String::new(),
worker: String::from("default"),
timeout: DEFAULT_JOB_TIMEOUT_IN_MS,
max_jobs_to_activate: DEFAULT_JOB_WORKER_MAX_JOB_ACTIVE as i32,
fetch_variable: Vec::new(),
request_timeout: DEFAULT_REQUEST_TIMEOUT.as_millis() as i64,
},
request_timeout: DEFAULT_REQUEST_TIMEOUT + REQUEST_TIMEOUT_OFFSET,
}
}
pub fn with_job_type<T: Into<String>>(mut self, job_type: T) -> Self {
self.request.r#type = job_type.into();
self
}
pub fn with_worker_name<T: Into<String>>(mut self, worker: T) -> Self {
self.request.worker = worker.into();
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.request.timeout = timeout.as_millis() as i64;
self
}
pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
self.request.request_timeout = request_timeout.as_millis() as i64;
self.request_timeout = request_timeout + REQUEST_TIMEOUT_OFFSET;
self
}
pub fn with_max_jobs_active(mut self, max_jobs_active: u32) -> Self {
self.request.max_jobs_to_activate = max_jobs_active as i32;
self
}
pub fn with_concurrency(self, concurrency: u32) -> Self {
JobWorkerBuilder {
concurrency,
..self
}
}
pub fn with_handler<T, R>(self, handler: T) -> Self
where
T: Fn(Client, Job) -> R + 'static,
R: Future<Output = ()> + 'static,
{
JobWorkerBuilder {
handler: Some(JobHandler(Arc::new(move |mut client, job| {
client.current_job_key = Some(job.key());
Box::pin(handler(client, job))
}))),
..self
}
}
pub fn with_auto_handler<F, T, R, O, E>(self, handler: F) -> Self
where
F: HandlerFactory<T, R, O, E>,
T: FromJob,
R: Future<Output = std::result::Result<O, E>> + 'static,
O: Serialize,
E: std::error::Error,
{
let job_type = self.request.r#type.clone();
self.with_handler(move |client, job| {
let span = tracing::info_span!(
"auto_handler",
otel.name = %job_type,
instance = job.process_instance_key(),
job = job.key(),
);
match T::from_job(&client, &job) {
Ok(params) => handler
.call(params)
.then(move |result| match result {
Ok(variables) => client
.complete_job()
.with_variables(json!(variables))
.send()
.map(|_| ())
.left_future(),
Err(err) => client
.fail_job()
.with_error_message(err.to_string())
.send()
.map(|_| ())
.right_future(),
})
.left_future()
.instrument(span),
Err(err) => {
span.in_scope(|| {
tracing::error!(%err, "variables do not deserialize to expected type");
});
client
.fail_job()
.with_error_message(format!(
"variables do not deserialize to expected type: {:?}",
err
))
.send()
.map(|_| ())
.right_future()
.instrument(span)
}
}
})
}
pub fn with_state<T: 'static>(mut self, t: T) -> Self {
self.data.insert(State::new(t));
self
}
pub fn with_fetch_variables(mut self, fetch_variables: Vec<String>) -> Self {
self.request.fetch_variable = fetch_variables;
self
}
pub async fn run(self) -> Result<()> {
if self.request.r#type.is_empty() || self.handler.is_none() {
return Err(Error::InvalidParameters(
"`job_type` and `handler` must be set",
));
}
let (job_queue, job_queue_rx) = mpsc::channel(self.request.max_jobs_to_activate as usize);
let (poll_queue, poll_rx) = mpsc::channel(32);
let poll_interval =
IntervalStream::new(interval(self.poll_interval)).map(|_| PollMessage::FetchJobs);
let worker_name = self.request.worker.clone();
let job_poller = JobPoller {
client: self.client.clone(),
request_timeout: self.request_timeout,
request_in_progress: false,
max_jobs_active: self.request.max_jobs_to_activate as u32,
job_queue,
message_sender: poll_queue.clone(),
messages: Box::pin(futures::stream::select(
ReceiverStream::new(poll_rx),
poll_interval,
)),
remaining: 0,
threshold: (self.request.max_jobs_to_activate as f32 * self.poll_threshold).floor()
as u32,
request: self.request,
};
futures::join!(
job_poller,
job_dispatcher::run(
job_queue_rx,
poll_queue,
self.concurrency as usize,
self.handler.unwrap(),
self.client.clone(),
worker_name,
self.data,
)
);
Ok(())
}
}