job_runner/lib.rs
1//! A simple [JobRunner] which gives each job a dedicated thread and allows for
2//! configurable delays between each invocation of the job's logic.
3//!
4//! # Example
5//!
6//! A program using the [JobRunner] is expected to have this basic outline:
7//!
8//! ```rust
9//! use job_runner::{Job, JobRunner, fixed_delay};
10//!
11//! fn main() {
12//! // At program startup, create the JobRunner and register your tasks.
13//! let mut job_runner = JobRunner::new();
14//! job_runner.start(Job::new(
15//! "cool_job",
16//! fixed_delay(std::time::Duration::from_secs(5)),
17//! my_cool_job));
18//!
19//! // Do other things in your program...
20//!
21//! // Then, when shutting down your program, signal all the job threads
22//! // to stop running.
23//! job_runner.stop_all();
24//!
25//! // Maybe signal other parts of your program to gracefully shut down too...
26//!
27//! // Finally (and optionally) wait for the job threads to actually exit.
28//! job_runner.join_all();
29//! }
30//!
31//! fn my_cool_job() {
32//! // Do cool things..
33//! }
34//! ```
35
36#![deny(rustdoc::broken_intra_doc_links)]
37#![deny(missing_docs)]
38#![forbid(unsafe_code)]
39
40use std::{time::{Instant, Duration}, sync::{Arc, Mutex, Condvar}, thread::JoinHandle, collections::HashMap};
41
42#[cfg(feature = "tracing")]
43use tracing::{info, warn, info_span};
44
45/// A [Schedule] implementation controls when jobs are executed. All that the [JobRunner]
46/// does is invoke a job in an infinite loop (until the [JobRunner] is shut down), with
47/// a delay between runs. The delay is controlled by the [Schedule], and schedules can specify
48/// either fixed or varying delays.
49pub trait Schedule : Send + 'static {
50 /// Returns when the next job execution should occur at. Typical implementations of this
51 /// method will choose the next delay by looking at the current time using mechanisms such
52 /// as [Instant::now()](std::time::Instant::now).
53 ///
54 /// The actual delay that takes place is bounded by your OS' standard timing precision. This
55 /// library does not attempt to use spin loops or any other strategies to get extremely precise
56 /// timings - the delay is ultimately implemented using [Condvar::wait_timeout](std::sync::Condvar::wait_timeout).
57 fn next_start_delay(&mut self) -> Duration;
58}
59
60impl<T> Schedule for T where T : FnMut() -> Duration + Send + 'static {
61 fn next_start_delay(&mut self) -> Duration {
62 self()
63 }
64}
65
66/// Returns a [Schedule] which runs the job constantly, as fast as possible.
67pub fn spin() -> impl Schedule {
68 || Duration::ZERO
69}
70
71/// Returns a [Schedule] which inserts a fixed delay between the end of one job
72/// execution and the start of the next. Note that this means that how often jobs
73/// execute depends on how long jobs take to run.
74pub fn fixed_delay(delay: Duration) -> impl Schedule {
75 move || delay
76}
77
78/// Returns a [Schedule] which runs jobs on a cron schedule. If a job execution
79/// runs overlong, then the executions which were overlapped will simply be skipped.
80/// For example, if a job is scheduled to run every second, but takes 5 seconds to run,
81/// then the 4 executions that should have happened while the slow job was executing
82/// will be skipped - only every 5th scheduled job will actually execute.
83#[cfg(feature = "cron")]
84pub fn cron(schedule: &str) -> Result<impl Schedule, ::cron::error::Error> {
85 use std::str::FromStr;
86 let schedule = ::cron::Schedule::from_str(schedule)?;
87 Ok(move || {
88 schedule.upcoming(chrono::Utc).next().and_then(|when| {
89 when.signed_duration_since(chrono::Utc::now()).to_std().ok()
90 }).unwrap_or(Duration::ZERO)
91 })
92}
93
94/// A description of a job that can be registered with a [JobRunner].
95pub struct Job {
96 name: String,
97 schedule: Box<dyn Schedule + Send + 'static>,
98 logic: Box<dyn FnMut() + Send + 'static>,
99 thread_builder: Option<Box<dyn FnOnce() -> std::thread::Builder>>,
100}
101
102impl Job {
103 /// Construct a new [Job] with a name, schedule, and the actual job logic.
104 pub fn new(name: impl Into<String>, schedule: impl Schedule, logic: impl FnMut() + Send + 'static) -> Self {
105 Self {
106 name: name.into(),
107 schedule: Box::new(schedule),
108 logic: Box::new(logic),
109 thread_builder: None,
110 }
111 }
112
113 /// Optional setting which allows you to customize the thread on which this job will
114 /// be executed. If this function is not called, the default thread builder sets the
115 /// thread name to the name of the job and does not specify an explicit stack size.
116 pub fn thread_builder(self, thread_builder: impl FnOnce() -> std::thread::Builder + 'static) -> Self {
117 Self {
118 thread_builder: Some(Box::new(thread_builder)),
119 ..self
120 }
121 }
122}
123
124/// The main coordinator for running jobs. It exposes methods to start and stop jobs,
125/// as well as to get the status of a job or all jobs.
126///
127/// Each job added to the [JobRunner] is given a dedicated thread to execute on, therefore
128/// the number of threads created by the [JobRunner] is equal to the number of jobs
129/// which are [started](JobRunner::start).
130pub struct JobRunner {
131 join_on_drop: bool,
132 jobs: HashMap<String, JobHandle>,
133}
134
135impl Default for JobRunner {
136 fn default() -> Self {
137 Self::new()
138 }
139}
140
141impl JobRunner {
142 /// Initialize a new [JobRunner] with no jobs started yet.
143 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
144 pub fn new() -> Self {
145 Self {
146 join_on_drop: false,
147 jobs: HashMap::new(),
148 }
149 }
150
151 /// Allows you to configure the [JobRunner] to wait for job threads to exit
152 /// when [dropped](Drop::drop). The default value for this option is `false`,
153 /// which is equivalent to calling the [stop_all](JobRunner::stop_all) method
154 /// at drop time. Passing `true` for this option is equivalent to calling the
155 /// [join_all](JobRunner::join_all) method at drop time.
156 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
157 pub fn join_on_drop(&mut self, join_on_drop: bool) -> &mut Self {
158 self.join_on_drop = join_on_drop;
159 self
160 }
161
162 /// Gets the latest status of a specific job by the job's name.
163 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
164 pub fn status(&self, job_name: &str) -> Option<JobStatus> {
165 self.jobs.get(job_name).and_then(|handle| {
166 Some(handle.status.lock().ok()?.clone())
167 })
168 }
169
170 /// Gets an iterator over all job statuses. The iterator item tuple's first entry
171 /// is the name of the job.
172 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
173 pub fn statuses(&self) -> impl Iterator<Item = (&String, JobStatus)> {
174 self.jobs.iter().flat_map(|(name, handle)| {
175 let status = match handle.status.lock() {
176 Ok(status) => status,
177 Err(_) => return None
178 };
179 Some((name, status.clone()))
180 })
181 }
182
183 /// Request that a specific job execute immediately. How soon the job executes
184 /// depends on whether it is currently executing, or whether it is sleeping waiting
185 /// for its next regular execution. If the job is currently executing, then once the
186 /// current execution ends, the job will immediately begin executing again rather
187 /// than sleeping. If the job is currently sleeping, then the sleep will be interrupted
188 /// and the job will begin executing on its dedicated thread.
189 ///
190 /// No matter how many times method is called before the job thread is actually able to
191 /// start the next execution, the job will only execute once for all those requests. This
192 /// can happen if for example a long-running job is executing and `request_execution` is
193 /// called multiple times before the currently-executing run finishes. In that case, the
194 /// job will immediately begin executing after the current run finishes, but after that
195 /// follow up run finishes then the job will go back to its normal schedule (assuming no
196 /// other `request_execution` calls have arrived in the mean time).
197 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
198 pub fn request_execution(&self, job_name: &str) {
199 let handle = match self.jobs.get(job_name) {
200 Some(h) => h,
201 None => {
202 #[cfg(feature = "tracing")]
203 info!("No job named {} is currently registered", job_name);
204 return;
205 }
206 };
207 {
208 let mut guard = match handle.shutdown.0.lock() {
209 Ok(g) => g,
210 Err(_) => {
211 #[cfg(feature = "tracing")]
212 warn!("Unable to request execution of job {} because poisoned shutdown mutex lock encountered", job_name);
213 return;
214 }
215 };
216 guard.1 = guard.1.saturating_add(1);
217 }
218 handle.shutdown.1.notify_one();
219 }
220
221 /// Registers a job and starts it executing on a dedicated thread. The job schedule's
222 /// [Schedule::next_start_delay] method will be called to determine when the
223 /// first job execution should occur.
224 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
225 pub fn start(&mut self, job: Job) -> std::io::Result<()> {
226 let status = Arc::new(Mutex::new(JobStatus::default()));
227 let shutdown = Arc::new((Mutex::new((false, 0)), Condvar::new()));
228 let thread_builder = match job.thread_builder {
229 Some(thread_builder) => thread_builder(),
230 None => std::thread::Builder::new()
231 .name(job.name.clone())
232 };
233 let join_handle = thread_builder.spawn({
234 let status = Arc::clone(&status);
235 let shutdown = Arc::clone(&shutdown);
236 let name = job.name.clone();
237 move || {
238 run_job(name, job.schedule, job.logic, status, shutdown)
239 }
240 })?;
241 let prev_handle = self.jobs.insert(job.name, JobHandle {
242 status,
243 shutdown,
244 join_handle,
245 });
246 if let Some(handle) = prev_handle {
247 handle.shutdown.0.lock().unwrap().0 = true;
248 let _ = handle.join_handle.join();
249 }
250 Ok(())
251 }
252
253 /// Signal to all jobs to stop executing. This will prevent any further job runs from
254 /// starting, but will not preemptively interrupt any currently-executing job runs.
255 /// Although the [join_all](JobRunner::join_all) method also signals all jobs to stop
256 /// executing, this method can still be useful to call at the start of application shut
257 /// down, if you have other parts of the program that you want to begin shutting down
258 /// too before calling the blocking [join_all](JobRunner::join_all) method. This method
259 /// signals, but does not block.
260 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
261 pub fn stop_all(&mut self) {
262 #[cfg(feature = "tracing")]
263 info!("Signaling {} jobs to stop", self.jobs.len());
264 for (name, handle) in &mut self.jobs {
265 #[cfg(feature = "tracing")]
266 let _span = info_span!("stop_job", job = name);
267 if let Ok(mut guard) = handle.shutdown.0.lock() {
268 if !guard.0 {
269 #[cfg(feature = "tracing")]
270 info!("Signaled job to shut down");
271 guard.0 = true;
272 }
273 } else {
274 #[cfg(feature = "tracing")]
275 warn!("Received poison error when trying to acquire shutdown signal lock");
276 }
277 }
278 }
279
280 /// Signal to all jobs to stop executing and then waits for the job threads to
281 /// exit before returning. Jobs that are waiting for the next scheduled run will
282 /// exit immediately, but currently-executing jobs will be allowed to complete
283 /// their current run - the [JobRunner] does not itself define any mechanism for
284 /// preemptively interrupting running jobs. That means that how long this method
285 /// takes to execute depends on how long the slowest currently-running job takes
286 /// to finish its run. If you have particularly long-running jobs, you may want
287 /// to pass them a separate cancellation token that you call before invoking this
288 /// method.
289 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
290 pub fn join_all(&mut self) {
291 self.stop_all();
292 #[cfg(feature = "tracing")]
293 info!("Joining {} jobs", self.jobs.len());
294 for (name, handle) in self.jobs.drain() {
295 #[cfg(feature = "tracing")]
296 let _span = info_span!("join_job", job = name);
297 match handle.join_handle.join() {
298 Ok(()) => {
299 #[cfg(feature = "tracing")]
300 info!("Job thread exited normally");
301 },
302 Err(_) => {
303 #[cfg(feature = "tracing")]
304 warn!("Job thread exited with a panic");
305 }
306 }
307 }
308 }
309}
310
311impl Drop for JobRunner {
312 fn drop(&mut self) {
313 if self.join_on_drop {
314 self.join_all();
315 } else {
316 self.stop_all();
317 }
318 }
319}
320
321struct JobHandle {
322 status: Arc<Mutex<JobStatus>>,
323 shutdown: Arc<(Mutex<(bool, usize)>, Condvar)>,
324 join_handle: JoinHandle<()>,
325}
326
327/// A snapshot of the current status of a job.
328#[derive(Default, Debug, Clone)]
329pub struct JobStatus {
330 /// How many times has the job been executed. This value is incremented when the
331 /// job logic execution begins, not when it ends.
332 pub runs: usize,
333 /// Whether the job logic is currently executing (true) or the job is sleeping until
334 /// the next scheduled run (false).
335 pub running: bool,
336 /// The time at which the latest finished job run started. May be [None] if the job
337 /// has never executed yet.
338 pub latest_start_time: Option<Instant>,
339 /// The time at which the latest finished job run ended. May be [None] if the job
340 /// has never executed yet, or has not finished executing for the first time.
341 pub latest_end_time: Option<Instant>,
342 /// The time at which the currently running execution started. This will be [None]
343 /// whenever the job is not executing.
344 pub current_start_time: Option<Instant>,
345 /// When the next job execution is scheduled for. This will be [None] when the job
346 /// is executing.
347 pub next_start_time: Option<Instant>,
348}
349
350fn run_job(
351 name: String,
352 mut schedule: Box<dyn Schedule>,
353 mut logic: Box<dyn FnMut()>,
354 status: Arc<Mutex<JobStatus>>,
355 shutdown: Arc<(Mutex<(bool, usize)>, Condvar)>) {
356 #[cfg(feature = "tracing")]
357 let _fn_span = tracing::info_span!("run_job", job = name);
358 loop {
359 let next_start_time = {
360 #[cfg(feature = "tracing")]
361 let _span = info_span!("job_schedule");
362 #[cfg(feature = "tracing")]
363 info!("Invoking job schedule");
364 Instant::now() + schedule.next_start_delay()
365 };
366
367 // Update the JobStatus for the next start time.
368 {
369 #[cfg(feature = "tracing")]
370 let _span = info_span!("job_next_start_status_update");
371 #[cfg(feature = "tracing")]
372 info!("Updating job status for next start time schedule");
373 let mut status = match status.lock() {
374 Ok(status) => status,
375 Err(_) => {
376 #[cfg(feature = "tracing")]
377 warn!("Job exiting run loop due to poison error when locking status for next start time update");
378 break;
379 }
380 };
381 status.next_start_time = Some(next_start_time);
382 };
383
384 let sleep_result = {
385 #[cfg(feature = "tracing")]
386 let _span = info_span!("job_sleep");
387 sleep_until(next_start_time, &shutdown)
388 };
389 if sleep_result.should_exit_job() {
390 #[cfg(feature = "tracing")]
391 info!("Job run loop will exit due to sleep result {:?}", sleep_result);
392 break;
393 }
394
395 // Update the JobStatus for the start of the current run.
396 let latest_start_time = {
397 #[cfg(feature = "tracing")]
398 let _span = info_span!("job_start_status_update");
399 #[cfg(feature = "tracing")]
400 info!("Updating job status for start of current run");
401 let now = Instant::now();
402 let mut status = match status.lock() {
403 Ok(status) => status,
404 Err(_) => {
405 #[cfg(feature = "tracing")]
406 warn!("Job exiting run loop due to poison error when locking status for start of job execution");
407 break;
408 }
409 };
410 status.runs += 1;
411 status.running = true;
412 status.current_start_time = Some(now);
413 status.next_start_time = None;
414 now
415 };
416
417 // Invoke the logic.
418 {
419 #[cfg(feature = "tracing")]
420 let _span = info_span!("job_logic");
421 #[cfg(feature = "tracing")]
422 info!("Invoking job logic");
423 logic();
424 }
425
426 // Update the JobStatus for the end of the current run.
427 {
428 #[cfg(feature = "tracing")]
429 let _span = info_span!("job_end_status_update");
430 #[cfg(feature = "tracing")]
431 info!("Updating job status for end of current run");
432 let now = Instant::now();
433 let mut status = match status.lock() {
434 Ok(status) => status,
435 Err(_) => {
436 #[cfg(feature = "tracing")]
437 warn!("Job exiting run loop due to poison error when locking status for end of job execution");
438 break;
439 }
440 };
441 status.running = false;
442 status.current_start_time = None;
443 status.latest_start_time = Some(latest_start_time);
444 status.latest_end_time = Some(now);
445 };
446 }
447}
448
449fn sleep_until(target_time: Instant, shutdown: &Arc<(Mutex<(bool, usize)>, Condvar)>) -> SleepResult {
450 let mut guard = match shutdown.0.lock() {
451 Ok(guard) => guard,
452 Err(_) => {
453 #[cfg(feature = "tracing")]
454 warn!("Sleep loop encountered poisoned shutdown mutex when acquiring initial shutdown signal lock, treating as shutdown signal");
455 return SleepResult::Shutdown;
456 }
457 };
458 loop {
459 let (is_shutdown, execute_requests) = *guard;
460 if is_shutdown {
461 #[cfg(feature = "tracing")]
462 info!("Sleep loop exiting early due to shutdown signal being true");
463 return SleepResult::Shutdown;
464 }
465 if execute_requests > 0 {
466 #[cfg(feature = "tracing")]
467 info!("Sleep loop exiting early due to the presence of {} execute requests, which have been reset to 0", execute_requests);
468 guard.1 = 0;
469 return SleepResult::ExecuteRequested;
470 }
471 let time_to_wait = Instant::now().saturating_duration_since(target_time);
472 if time_to_wait.is_zero() {
473 #[cfg(feature = "tracing")]
474 info!("Sleep loop finished waiting for time to pass");
475 return SleepResult::SleepFinished;
476 }
477 match shutdown.1.wait_timeout(guard, time_to_wait) {
478 Ok((g, _)) => {
479 guard = g
480 },
481 Err(_) => {
482 #[cfg(feature = "tracing")]
483 warn!("Sleep loop saw poisoned shutdown mutex while sleeping, treating as shutdown signal");
484 return SleepResult::Shutdown;
485 }
486 }
487 }
488}
489
490#[derive(Debug)]
491enum SleepResult {
492 Shutdown,
493 ExecuteRequested,
494 SleepFinished,
495}
496
497impl SleepResult {
498 fn should_exit_job(&self) -> bool {
499 match self {
500 SleepResult::Shutdown => true,
501 SleepResult::ExecuteRequested
502 | SleepResult::SleepFinished => false,
503 }
504 }
505}