#![cfg_attr(feature = "fail-on-warnings", deny(warnings))]
#![cfg_attr(feature = "fail-on-warnings", deny(clippy::all))]
#![forbid(unsafe_code)]
mod config;
mod current;
mod dispatcher;
mod entity;
mod handle;
mod migrate;
mod notification_router;
mod poller;
mod registry;
mod repo;
mod runner;
mod spawner;
mod tracker;
pub mod error;
use es_entity::AtomicOperation;
use tracing::instrument;
use std::sync::{Arc, Mutex};
use std::time::Duration;
pub use config::*;
pub use current::*;
pub use entity::{Job, JobCompletionResult, JobResult, JobTerminalState, JobType};
pub use es_entity::clock::{Clock, ClockController, ClockHandle};
pub use migrate::*;
pub use registry::*;
pub use runner::*;
pub use spawner::*;
use error::*;
use notification_router::*;
use poller::*;
use repo::*;
use tracker::*;
es_entity::entity_id! { JobId }
#[derive(Clone)]
pub struct Jobs {
config: JobSvcConfig,
repo: Arc<JobRepo>,
registry: Arc<Mutex<Option<JobRegistry>>>,
router: Arc<JobNotificationRouter>,
poller_handle: Option<Arc<JobPollerHandle>>,
clock: ClockHandle,
}
impl Jobs {
pub async fn init(config: JobSvcConfig) -> Result<Self, JobError> {
let pool = match (config.pool.clone(), config.pg_con.clone()) {
(Some(pool), None) => pool,
(None, Some(pg_con)) => {
let mut pool_opts = sqlx::postgres::PgPoolOptions::new();
if let Some(max_connections) = config.max_connections {
pool_opts = pool_opts.max_connections(max_connections);
}
pool_opts.connect(&pg_con).await?
}
_ => {
return Err(JobError::Config(
"One of pg_con or pool must be set".to_string(),
));
}
};
if config.exec_migrations {
sqlx::migrate!().run(&pool).await?;
}
let repo = Arc::new(JobRepo::new(&pool));
let registry = Arc::new(Mutex::new(Some(JobRegistry::new())));
let router = Arc::new(JobNotificationRouter::new(&pool, Arc::clone(&repo)));
let clock = config.clock.clone();
Ok(Self {
repo,
config,
registry,
router,
poller_handle: None,
clock,
})
}
pub async fn start_poll(&mut self) -> Result<(), JobError> {
let registry = self
.registry
.lock()
.expect("Couldn't lock Registry Mutex")
.take()
.expect("Registry has been consumed by executor");
let tracker = Arc::new(JobTracker::new(
self.config.poller_config.min_jobs_per_process,
self.config.poller_config.max_jobs_per_process,
));
let poller = JobPoller::new(
self.config.poller_config.clone(),
Arc::clone(&self.repo),
registry,
Arc::clone(&tracker),
self.clock.clone(),
);
let job_types = poller.registered_job_types();
let (listener_handle, waiter_handle) =
self.router.start(Arc::clone(&tracker), job_types).await?;
let poller_handle = poller.start(listener_handle, waiter_handle);
self.poller_handle = Some(Arc::new(poller_handle));
Ok(())
}
pub fn add_initializer<I: JobInitializer>(&mut self, initializer: I) -> JobSpawner<I::Config> {
let job_type = {
let mut registry = self.registry.lock().expect("Couldn't lock Registry Mutex");
registry
.as_mut()
.expect("Registry has been consumed by executor")
.add_initializer(initializer)
};
JobSpawner::new(Arc::clone(&self.repo), job_type, self.clock.clone())
}
#[instrument(name = "job.find", skip(self))]
pub async fn find(&self, id: JobId) -> Result<Job, JobError> {
Ok(self.repo.find_by_id(id).await?)
}
#[instrument(name = "job.cancel_job", skip(self))]
pub async fn cancel_job(&self, id: JobId) -> Result<(), JobError> {
let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
let mut job = self.repo.find_by_id(id).await?;
if job.cancel().did_execute() {
let result = sqlx::query!(
r#"DELETE FROM job_executions WHERE id = $1 AND state = 'pending'"#,
id as JobId,
)
.execute(op.as_executor())
.await?;
if result.rows_affected() == 0 {
return Err(JobError::CannotCancelJob);
}
self.repo.update_in_op(&mut op, &mut job).await?;
op.commit().await?;
}
Ok(())
}
pub fn clock(&self) -> &ClockHandle {
&self.clock
}
#[instrument(name = "job.await_completion", skip(self))]
pub async fn await_completion(
&self,
id: JobId,
timeout: Option<Duration>,
) -> Result<JobCompletionResult, JobError> {
self.find(id).await?;
let rx = self.router.wait_for_terminal(id);
let state = match timeout {
Some(duration) => tokio::time::timeout(duration, rx)
.await
.map_err(|_| JobError::TimedOut(id))?
.map_err(|_| JobError::AwaitCompletionShutdown(id))?,
None => rx
.await
.map_err(|_| JobError::AwaitCompletionShutdown(id))?,
};
let job = self.find(id).await?;
Ok(JobCompletionResult::new(state, job.raw_result().cloned()))
}
#[instrument(name = "job.poll_completion", skip(self))]
pub async fn poll_completion(&self, id: JobId) -> Result<Option<JobTerminalState>, JobError> {
let job = self.find(id).await?;
Ok(job.terminal_state())
}
#[instrument(name = "job.shutdown", skip(self), err)]
pub async fn shutdown(&self) -> Result<(), JobError> {
if let Some(handle) = &self.poller_handle {
handle.shutdown().await?;
}
Ok(())
}
}