job_runner/
lib.rs

1//! A simple [JobRunner] which gives each job a dedicated thread and allows for 
2//! configurable delays between each invocation of the job's logic.
3//! 
4//! # Example
5//! 
6//! A program using the [JobRunner] is expected to have this basic outline:
7//! 
8//! ```rust
9//! use job_runner::{Job, JobRunner, fixed_delay};
10//! 
11//! fn main() {
12//!     // At program startup, create the JobRunner and register your tasks.
13//!     let mut job_runner = JobRunner::new();
14//!     job_runner.start(Job::new(
15//!         "cool_job",
16//!         fixed_delay(std::time::Duration::from_secs(5)),
17//!         my_cool_job));
18//! 
19//!     // Do other things in your program...
20//! 
21//!     // Then, when shutting down your program, signal all the job threads
22//!     // to stop running.
23//!     job_runner.stop_all();
24//! 
25//!     // Maybe signal other parts of your program to gracefully shut down too...
26//! 
27//!     // Finally (and optionally) wait for the job threads to actually exit.
28//!     job_runner.join_all();
29//! }
30//! 
31//! fn my_cool_job() {
32//!     // Do cool things..
33//! }
34//! ```
35
36#![deny(rustdoc::broken_intra_doc_links)]
37#![deny(missing_docs)]
38#![forbid(unsafe_code)]
39
40use std::{time::{Instant, Duration}, sync::{Arc, Mutex, Condvar}, thread::JoinHandle, collections::HashMap};
41
42#[cfg(feature = "tracing")]
43use tracing::{info, warn, info_span};
44
45/// A [Schedule] implementation controls when jobs are executed. All that the [JobRunner]
46/// does is invoke a job in an infinite loop (until the [JobRunner] is shut down), with
47/// a delay between runs. The delay is controlled by the [Schedule], and schedules can specify
48/// either fixed or varying delays.
49pub trait Schedule : Send + 'static {
50    /// Returns when the next job execution should occur at. Typical implementations of this
51    /// method will choose the next delay by looking at the current time using mechanisms such
52    /// as [Instant::now()](std::time::Instant::now).
53    /// 
54    /// The actual delay that takes place is bounded by your OS' standard timing precision. This
55    /// library does not attempt to use spin loops or any other strategies to get extremely precise
56    /// timings - the delay is ultimately implemented using [Condvar::wait_timeout](std::sync::Condvar::wait_timeout).
57    fn next_start_delay(&mut self) -> Duration;
58}
59
60impl<T> Schedule for T where T : FnMut() -> Duration + Send + 'static {
61    fn next_start_delay(&mut self) -> Duration {
62        self()
63    }
64}
65
66/// Returns a [Schedule] which runs the job constantly, as fast as possible.
67pub fn spin() -> impl Schedule {
68    || Duration::ZERO
69}
70
71/// Returns a [Schedule] which inserts a fixed delay between the end of one job
72/// execution and the start of the next. Note that this means that how often jobs
73/// execute depends on how long jobs take to run.
74pub fn fixed_delay(delay: Duration) -> impl Schedule {
75    move || delay
76}
77
78/// Returns a [Schedule] which runs jobs on a cron schedule. If a job execution
79/// runs overlong, then the executions which were overlapped will simply be skipped.
80/// For example, if a job is scheduled to run every second, but takes 5 seconds to run,
81/// then the 4 executions that should have happened while the slow job was executing
82/// will be skipped - only every 5th scheduled job will actually execute.
83#[cfg(feature = "cron")]
84pub fn cron(schedule: &str) -> Result<impl Schedule, ::cron::error::Error> {
85    use std::str::FromStr;
86    let schedule = ::cron::Schedule::from_str(schedule)?;
87    Ok(move || {
88        schedule.upcoming(chrono::Utc).next().and_then(|when| {
89            when.signed_duration_since(chrono::Utc::now()).to_std().ok()
90        }).unwrap_or(Duration::ZERO)
91    })
92}
93
94/// A description of a job that can be registered with a [JobRunner].
95pub struct Job {
96    name: String,
97    schedule: Box<dyn Schedule + Send + 'static>,
98    logic: Box<dyn FnMut() + Send + 'static>,
99    thread_builder: Option<Box<dyn FnOnce() -> std::thread::Builder>>,
100}
101
102impl Job {
103    /// Construct a new [Job] with a name, schedule, and the actual job logic.
104    pub fn new(name: impl Into<String>, schedule: impl Schedule, logic: impl FnMut() + Send + 'static) -> Self {
105        Self {
106            name: name.into(),
107            schedule: Box::new(schedule),
108            logic: Box::new(logic),
109            thread_builder: None,
110        }
111    }
112
113    /// Optional setting which allows you to customize the thread on which this job will
114    /// be executed. If this function is not called, the default thread builder sets the
115    /// thread name to the name of the job and does not specify an explicit stack size.
116    pub fn thread_builder(self, thread_builder: impl FnOnce() -> std::thread::Builder + 'static) -> Self {
117        Self {
118            thread_builder: Some(Box::new(thread_builder)),
119            ..self
120        }
121    }
122}
123
124/// The main coordinator for running jobs. It exposes methods to start and stop jobs,
125/// as well as to get the status of a job or all jobs.
126/// 
127/// Each job added to the [JobRunner] is given a dedicated thread to execute on, therefore
128/// the number of threads created by the [JobRunner] is equal to the number of jobs
129/// which are [started](JobRunner::start).
130pub struct JobRunner {
131    join_on_drop: bool,
132    jobs: HashMap<String, JobHandle>,
133}
134
135impl Default for JobRunner {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141impl JobRunner {
142    /// Initialize a new [JobRunner] with no jobs started yet.
143    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
144    pub fn new() -> Self {
145        Self {
146            join_on_drop: false,
147            jobs: HashMap::new(),
148        }
149    }
150
151    /// Allows you to configure the [JobRunner] to wait for job threads to exit
152    /// when [dropped](Drop::drop). The default value for this option is `false`,
153    /// which is equivalent to calling the [stop_all](JobRunner::stop_all) method
154    /// at drop time. Passing `true` for this option is equivalent to calling the
155    /// [join_all](JobRunner::join_all) method at drop time.
156    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
157    pub fn join_on_drop(&mut self, join_on_drop: bool) -> &mut Self {
158        self.join_on_drop = join_on_drop;
159        self
160    }
161
162    /// Gets the latest status of a specific job by the job's name.
163    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
164    pub fn status(&self, job_name: &str) -> Option<JobStatus> {
165        self.jobs.get(job_name).and_then(|handle| {
166            Some(handle.status.lock().ok()?.clone())
167        })
168    }
169
170    /// Gets an iterator over all job statuses. The iterator item tuple's first entry
171    /// is the name of the job.
172    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
173    pub fn statuses(&self) -> impl Iterator<Item = (&String, JobStatus)> {
174        self.jobs.iter().flat_map(|(name, handle)| {
175            let status = match handle.status.lock() {
176                Ok(status) => status,
177                Err(_) => return None
178            };
179            Some((name, status.clone()))
180        })
181    }
182
183    /// Request that a specific job execute immediately. How soon the job executes
184    /// depends on whether it is currently executing, or whether it is sleeping waiting
185    /// for its next regular execution. If the job is currently executing, then once the
186    /// current execution ends, the job will immediately begin executing again rather
187    /// than sleeping. If the job is currently sleeping, then the sleep will be interrupted
188    /// and the job will begin executing on its dedicated thread.
189    ///
190    /// No matter how many times method is called before the job thread is actually able to
191    /// start the next execution, the job will only execute once for all those requests. This
192    /// can happen if for example a long-running job is executing and `request_execution` is
193    /// called multiple times before the currently-executing run finishes. In that case, the
194    /// job will immediately begin executing after the current run finishes, but after that
195    /// follow up run finishes then the job will go back to its normal schedule (assuming no
196    /// other `request_execution` calls have arrived in the mean time).
197    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
198    pub fn request_execution(&self, job_name: &str) {
199        let handle = match self.jobs.get(job_name) {
200            Some(h) => h,
201            None => {
202                #[cfg(feature = "tracing")]
203                info!("No job named {} is currently registered", job_name);
204                return;
205            }
206        };
207        {
208            let mut guard = match handle.shutdown.0.lock() {
209                Ok(g) => g,
210                Err(_) => {
211                    #[cfg(feature = "tracing")]
212                    warn!("Unable to request execution of job {} because poisoned shutdown mutex lock encountered", job_name);
213                    return;
214                }
215            };
216            guard.1 = guard.1.saturating_add(1);
217        }
218        handle.shutdown.1.notify_one();
219    }
220
221    /// Registers a job and starts it executing on a dedicated thread. The job schedule's
222    /// [Schedule::next_start_delay] method will be called to determine when the
223    /// first job execution should occur.
224    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
225    pub fn start(&mut self, job: Job) -> std::io::Result<()> {
226        let status = Arc::new(Mutex::new(JobStatus::default()));
227        let shutdown = Arc::new((Mutex::new((false, 0)), Condvar::new()));
228        let thread_builder = match job.thread_builder {
229            Some(thread_builder) => thread_builder(),
230            None => std::thread::Builder::new()
231                .name(job.name.clone())
232        };
233        let join_handle = thread_builder.spawn({
234            let status = Arc::clone(&status);
235            let shutdown = Arc::clone(&shutdown);
236            let name = job.name.clone();
237            move || {
238                run_job(name, job.schedule, job.logic, status, shutdown)
239            }
240        })?;
241        let prev_handle = self.jobs.insert(job.name, JobHandle {
242            status,
243            shutdown,
244            join_handle,
245        });
246        if let Some(handle) = prev_handle {
247            handle.shutdown.0.lock().unwrap().0 = true;
248            let _ = handle.join_handle.join();
249        }
250        Ok(())
251    }
252
253    /// Signal to all jobs to stop executing. This will prevent any further job runs from
254    /// starting, but will not preemptively interrupt any currently-executing job runs.
255    /// Although the [join_all](JobRunner::join_all) method also signals all jobs to stop
256    /// executing, this method can still be useful to call at the start of application shut
257    /// down, if you have other parts of the program that you want to begin shutting down
258    /// too before calling the blocking [join_all](JobRunner::join_all) method. This method
259    /// signals, but does not block.
260    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
261    pub fn stop_all(&mut self) {
262        #[cfg(feature = "tracing")]
263        info!("Signaling {} jobs to stop", self.jobs.len());
264        for (name, handle) in &mut self.jobs {
265            #[cfg(feature = "tracing")]
266            let _span = info_span!("stop_job", job = name);
267            if let Ok(mut guard) = handle.shutdown.0.lock() {
268                if !guard.0 {
269                    #[cfg(feature = "tracing")]
270                    info!("Signaled job to shut down");
271                    guard.0 = true;
272                }
273            } else {
274                #[cfg(feature = "tracing")]
275                warn!("Received poison error when trying to acquire shutdown signal lock");
276            }
277        }
278    }
279
280    /// Signal to all jobs to stop executing and then waits for the job threads to
281    /// exit before returning. Jobs that are waiting for the next scheduled run will
282    /// exit immediately, but currently-executing jobs will be allowed to complete
283    /// their current run - the [JobRunner] does not itself define any mechanism for
284    /// preemptively interrupting running jobs. That means that how long this method
285    /// takes to execute depends on how long the slowest currently-running job takes
286    /// to finish its run. If you have particularly long-running jobs, you may want
287    /// to pass them a separate cancellation token that you call before invoking this
288    /// method.
289    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
290    pub fn join_all(&mut self) {
291        self.stop_all();
292        #[cfg(feature = "tracing")]
293        info!("Joining {} jobs", self.jobs.len());
294        for (name, handle) in self.jobs.drain() {
295            #[cfg(feature = "tracing")]
296            let _span = info_span!("join_job", job = name);
297            match handle.join_handle.join() {
298                Ok(()) => {
299                    #[cfg(feature = "tracing")]
300                    info!("Job thread exited normally");
301                },
302                Err(_) => {
303                    #[cfg(feature = "tracing")]
304                    warn!("Job thread exited with a panic");
305                }
306            }
307        }
308    }
309}
310
311impl Drop for JobRunner {
312    fn drop(&mut self) {
313        if self.join_on_drop {
314            self.join_all();
315        } else {
316            self.stop_all();
317        }
318    }
319}
320
321struct JobHandle {
322    status: Arc<Mutex<JobStatus>>,
323    shutdown: Arc<(Mutex<(bool, usize)>, Condvar)>,
324    join_handle: JoinHandle<()>,
325}
326
327/// A snapshot of the current status of a job.
328#[derive(Default, Debug, Clone)]
329pub struct JobStatus {
330    /// How many times has the job been executed. This value is incremented when the
331    /// job logic execution begins, not when it ends.
332    pub runs: usize,
333    /// Whether the job logic is currently executing (true) or the job is sleeping until
334    /// the next scheduled run (false).
335    pub running: bool,
336    /// The time at which the latest finished job run started. May be [None] if the job
337    /// has never executed yet.
338    pub latest_start_time: Option<Instant>,
339    /// The time at which the latest finished job run ended. May be [None] if the job
340    /// has never executed yet, or has not finished executing for the first time.
341    pub latest_end_time: Option<Instant>,
342    /// The time at which the currently running execution started. This will be [None]
343    /// whenever the job is not executing.
344    pub current_start_time: Option<Instant>,
345    /// When the next job execution is scheduled for. This will be [None] when the job
346    /// is executing.
347    pub next_start_time: Option<Instant>,
348}
349
350fn run_job(
351        name: String,
352        mut schedule: Box<dyn Schedule>,
353        mut logic: Box<dyn FnMut()>,
354        status: Arc<Mutex<JobStatus>>,
355        shutdown: Arc<(Mutex<(bool, usize)>, Condvar)>) {
356    #[cfg(feature = "tracing")]
357    let _fn_span = tracing::info_span!("run_job", job = name);
358    loop {
359        let next_start_time = {
360            #[cfg(feature = "tracing")]
361            let _span = info_span!("job_schedule");
362            #[cfg(feature = "tracing")]
363            info!("Invoking job schedule");
364            Instant::now() + schedule.next_start_delay()
365        };
366
367        // Update the JobStatus for the next start time.
368        {
369            #[cfg(feature = "tracing")]
370            let _span = info_span!("job_next_start_status_update");
371            #[cfg(feature = "tracing")]
372            info!("Updating job status for next start time schedule");
373            let mut status = match status.lock() {
374                Ok(status) => status,
375                Err(_) => {
376                    #[cfg(feature = "tracing")]
377                    warn!("Job exiting run loop due to poison error when locking status for next start time update");
378                    break;
379                }
380            };
381            status.next_start_time = Some(next_start_time);
382        };
383        
384        let sleep_result = {
385            #[cfg(feature = "tracing")]
386            let _span = info_span!("job_sleep");
387            sleep_until(next_start_time, &shutdown)
388        };
389        if sleep_result.should_exit_job() {
390            #[cfg(feature = "tracing")]
391            info!("Job run loop will exit due to sleep result {:?}", sleep_result);
392            break;
393        }
394        
395        // Update the JobStatus for the start of the current run.
396        let latest_start_time = {
397            #[cfg(feature = "tracing")]
398            let _span = info_span!("job_start_status_update");
399            #[cfg(feature = "tracing")]
400            info!("Updating job status for start of current run");
401            let now = Instant::now();
402            let mut status = match status.lock() {
403                Ok(status) => status,
404                Err(_) => {
405                    #[cfg(feature = "tracing")]
406                    warn!("Job exiting run loop due to poison error when locking status for start of job execution");
407                    break;
408                }
409            };
410            status.runs += 1;
411            status.running = true;
412            status.current_start_time = Some(now);
413            status.next_start_time = None;
414            now
415        };
416
417        // Invoke the logic.
418        {
419            #[cfg(feature = "tracing")]
420            let _span = info_span!("job_logic");
421            #[cfg(feature = "tracing")]
422            info!("Invoking job logic");
423            logic();
424        }
425
426        // Update the JobStatus for the end of the current run.
427        {
428            #[cfg(feature = "tracing")]
429            let _span = info_span!("job_end_status_update");
430            #[cfg(feature = "tracing")]
431            info!("Updating job status for end of current run");
432            let now = Instant::now();
433            let mut status = match status.lock() {
434                Ok(status) => status,
435                Err(_) => {
436                    #[cfg(feature = "tracing")]
437                    warn!("Job exiting run loop due to poison error when locking status for end of job execution");
438                    break;
439                }
440            };
441            status.running = false;
442            status.current_start_time = None;
443            status.latest_start_time = Some(latest_start_time);
444            status.latest_end_time = Some(now);
445        };
446    }
447}
448
449fn sleep_until(target_time: Instant, shutdown: &Arc<(Mutex<(bool, usize)>, Condvar)>) -> SleepResult {
450    let mut guard = match shutdown.0.lock() {
451        Ok(guard) => guard,
452        Err(_) => {
453            #[cfg(feature = "tracing")]
454            warn!("Sleep loop encountered poisoned shutdown mutex when acquiring initial shutdown signal lock, treating as shutdown signal");
455            return SleepResult::Shutdown;
456        }
457    };
458    loop {
459        let (is_shutdown, execute_requests) = *guard;
460        if is_shutdown {
461            #[cfg(feature = "tracing")]
462            info!("Sleep loop exiting early due to shutdown signal being true");
463            return SleepResult::Shutdown;
464        }
465        if execute_requests > 0 {
466            #[cfg(feature = "tracing")]
467            info!("Sleep loop exiting early due to the presence of {} execute requests, which have been reset to 0", execute_requests);
468            guard.1 = 0;
469            return SleepResult::ExecuteRequested;
470        }
471        let time_to_wait = Instant::now().saturating_duration_since(target_time);
472        if time_to_wait.is_zero() {
473            #[cfg(feature = "tracing")]
474            info!("Sleep loop finished waiting for time to pass");
475            return SleepResult::SleepFinished;
476        }
477        match shutdown.1.wait_timeout(guard, time_to_wait) {
478            Ok((g, _)) => {
479                guard = g
480            },
481            Err(_) => {
482                #[cfg(feature = "tracing")]
483                warn!("Sleep loop saw poisoned shutdown mutex while sleeping, treating as shutdown signal");
484                return SleepResult::Shutdown;
485            }
486        }
487    }
488}
489
490#[derive(Debug)]
491enum SleepResult {
492    Shutdown,
493    ExecuteRequested,
494    SleepFinished,
495}
496
497impl SleepResult {
498    fn should_exit_job(&self) -> bool {
499        match self {
500            SleepResult::Shutdown => true,
501            SleepResult::ExecuteRequested
502            | SleepResult::SleepFinished => false,
503        }
504    }
505}