sqlxmq/
registry.rs

1use std::any::type_name;
2use std::collections::HashMap;
3use std::error::Error;
4use std::fmt::Display;
5use std::future::Future;
6use std::sync::Arc;
7use std::time::Instant;
8
9use anymap2::any::CloneAnySendSync;
10use anymap2::Map;
11use sqlx::{Pool, Postgres};
12use uuid::Uuid;
13
14use crate::hidden::{BuildFn, RunFn};
15use crate::utils::Opaque;
16use crate::{JobBuilder, JobRunnerOptions};
17
18type BoxedError = Box<dyn Error + Send + 'static>;
19
20/// Stores a mapping from job name to job. Can be used to construct
21/// a job runner.
22pub struct JobRegistry {
23    #[allow(clippy::type_complexity)]
24    error_handler: Arc<dyn Fn(&str, BoxedError) + Send + Sync>,
25    job_map: HashMap<&'static str, &'static NamedJob>,
26    context: Map<dyn CloneAnySendSync + Send + Sync>,
27}
28
29/// Error returned when a job is received whose name is not in the registry.
30#[derive(Debug)]
31pub struct UnknownJobError;
32
33impl Error for UnknownJobError {}
34impl Display for UnknownJobError {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        f.write_str("Unknown job")
37    }
38}
39
40impl JobRegistry {
41    /// Construct a new job registry from the provided job list.
42    pub fn new(jobs: &[&'static NamedJob]) -> Self {
43        let mut job_map = HashMap::new();
44        for &job in jobs {
45            if job_map.insert(job.name(), job).is_some() {
46                panic!("Duplicate job registered: {}", job.name());
47            }
48        }
49        Self {
50            error_handler: Arc::new(Self::default_error_handler),
51            job_map,
52            context: Map::new(),
53        }
54    }
55
56    /// Set a function to be called whenever a job returns an error.
57    pub fn set_error_handler(
58        &mut self,
59        error_handler: impl Fn(&str, BoxedError) + Send + Sync + 'static,
60    ) -> &mut Self {
61        self.error_handler = Arc::new(error_handler);
62        self
63    }
64
65    /// Provide context for the jobs.
66    pub fn set_context<C: Clone + Send + Sync + 'static>(&mut self, context: C) -> &mut Self {
67        self.context.insert(context);
68        self
69    }
70
71    /// Access job context. Will panic if context with this type has not been provided.
72    pub fn context<C: Clone + Send + Sync + 'static>(&self) -> C {
73        if let Some(c) = self.context.get::<C>() {
74            c.clone()
75        } else {
76            panic!(
77                "No context of type `{}` has been provided.",
78                type_name::<C>()
79            );
80        }
81    }
82
83    /// Look-up a job by name.
84    pub fn resolve_job(&self, name: &str) -> Option<&'static NamedJob> {
85        self.job_map.get(name).copied()
86    }
87
88    /// The default error handler implementation, which simply logs the error.
89    pub fn default_error_handler(name: &str, error: BoxedError) {
90        log::error!("Job `{}` failed: {}", name, error);
91    }
92
93    #[doc(hidden)]
94    pub fn spawn_internal<E: Into<Box<dyn Error + Send + Sync + 'static>>>(
95        &self,
96        name: &'static str,
97        f: impl Future<Output = Result<(), E>> + Send + 'static,
98    ) {
99        let error_handler = self.error_handler.clone();
100        tokio::spawn(async move {
101            let start_time = Instant::now();
102            log::info!("Job `{}` started.", name);
103            if let Err(e) = f.await {
104                error_handler(name, e.into());
105            } else {
106                log::info!(
107                    "Job `{}` completed in {}s.",
108                    name,
109                    start_time.elapsed().as_secs_f64()
110                );
111            }
112        });
113    }
114
115    /// Construct a job runner from this registry and the provided connection
116    /// pool.
117    pub fn runner(self, pool: &Pool<Postgres>) -> JobRunnerOptions {
118        JobRunnerOptions::new(pool, move |current_job| {
119            if let Some(job) = self.resolve_job(current_job.name()) {
120                (job.run_fn.0 .0)(&self, current_job);
121            } else {
122                (self.error_handler)(current_job.name(), Box::new(UnknownJobError))
123            }
124        })
125    }
126}
127
128/// Type for a named job. Functions annotated with `#[job]` are
129/// transformed into static variables whose type is `&'static NamedJob`.
130#[derive(Debug)]
131pub struct NamedJob {
132    name: &'static str,
133    build_fn: Opaque<BuildFn>,
134    run_fn: Opaque<RunFn>,
135}
136
137impl NamedJob {
138    #[doc(hidden)]
139    pub const fn new_internal(name: &'static str, build_fn: BuildFn, run_fn: RunFn) -> Self {
140        Self {
141            name,
142            build_fn: Opaque(build_fn),
143            run_fn: Opaque(run_fn),
144        }
145    }
146    /// Initialize a job builder with the name and defaults of this job.
147    pub fn builder(&self) -> JobBuilder<'static> {
148        let mut builder = JobBuilder::new(self.name);
149        (self.build_fn.0 .0)(&mut builder);
150        builder
151    }
152    /// Initialize a job builder with the name and defaults of this job,
153    /// using the provided job ID.
154    pub fn builder_with_id(&self, id: Uuid) -> JobBuilder<'static> {
155        let mut builder = JobBuilder::new_with_id(id, self.name);
156        (self.build_fn.0 .0)(&mut builder);
157        builder
158    }
159
160    /// Returns the name of this job.
161    pub const fn name(&self) -> &'static str {
162        self.name
163    }
164}