1use std::any::type_name;
2use std::collections::HashMap;
3use std::error::Error;
4use std::fmt::Display;
5use std::future::Future;
6use std::sync::Arc;
7use std::time::Instant;
8
9use anymap2::any::CloneAnySendSync;
10use anymap2::Map;
11use sqlx::{Pool, Postgres};
12use uuid::Uuid;
13
14use crate::hidden::{BuildFn, RunFn};
15use crate::utils::Opaque;
16use crate::{JobBuilder, JobRunnerOptions};
17
18type BoxedError = Box<dyn Error + Send + 'static>;
19
20pub struct JobRegistry {
23 #[allow(clippy::type_complexity)]
24 error_handler: Arc<dyn Fn(&str, BoxedError) + Send + Sync>,
25 job_map: HashMap<&'static str, &'static NamedJob>,
26 context: Map<dyn CloneAnySendSync + Send + Sync>,
27}
28
29#[derive(Debug)]
31pub struct UnknownJobError;
32
33impl Error for UnknownJobError {}
34impl Display for UnknownJobError {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 f.write_str("Unknown job")
37 }
38}
39
40impl JobRegistry {
41 pub fn new(jobs: &[&'static NamedJob]) -> Self {
43 let mut job_map = HashMap::new();
44 for &job in jobs {
45 if job_map.insert(job.name(), job).is_some() {
46 panic!("Duplicate job registered: {}", job.name());
47 }
48 }
49 Self {
50 error_handler: Arc::new(Self::default_error_handler),
51 job_map,
52 context: Map::new(),
53 }
54 }
55
56 pub fn set_error_handler(
58 &mut self,
59 error_handler: impl Fn(&str, BoxedError) + Send + Sync + 'static,
60 ) -> &mut Self {
61 self.error_handler = Arc::new(error_handler);
62 self
63 }
64
65 pub fn set_context<C: Clone + Send + Sync + 'static>(&mut self, context: C) -> &mut Self {
67 self.context.insert(context);
68 self
69 }
70
71 pub fn context<C: Clone + Send + Sync + 'static>(&self) -> C {
73 if let Some(c) = self.context.get::<C>() {
74 c.clone()
75 } else {
76 panic!(
77 "No context of type `{}` has been provided.",
78 type_name::<C>()
79 );
80 }
81 }
82
83 pub fn resolve_job(&self, name: &str) -> Option<&'static NamedJob> {
85 self.job_map.get(name).copied()
86 }
87
88 pub fn default_error_handler(name: &str, error: BoxedError) {
90 log::error!("Job `{}` failed: {}", name, error);
91 }
92
93 #[doc(hidden)]
94 pub fn spawn_internal<E: Into<Box<dyn Error + Send + Sync + 'static>>>(
95 &self,
96 name: &'static str,
97 f: impl Future<Output = Result<(), E>> + Send + 'static,
98 ) {
99 let error_handler = self.error_handler.clone();
100 tokio::spawn(async move {
101 let start_time = Instant::now();
102 log::info!("Job `{}` started.", name);
103 if let Err(e) = f.await {
104 error_handler(name, e.into());
105 } else {
106 log::info!(
107 "Job `{}` completed in {}s.",
108 name,
109 start_time.elapsed().as_secs_f64()
110 );
111 }
112 });
113 }
114
115 pub fn runner(self, pool: &Pool<Postgres>) -> JobRunnerOptions {
118 JobRunnerOptions::new(pool, move |current_job| {
119 if let Some(job) = self.resolve_job(current_job.name()) {
120 (job.run_fn.0 .0)(&self, current_job);
121 } else {
122 (self.error_handler)(current_job.name(), Box::new(UnknownJobError))
123 }
124 })
125 }
126}
127
128#[derive(Debug)]
131pub struct NamedJob {
132 name: &'static str,
133 build_fn: Opaque<BuildFn>,
134 run_fn: Opaque<RunFn>,
135}
136
137impl NamedJob {
138 #[doc(hidden)]
139 pub const fn new_internal(name: &'static str, build_fn: BuildFn, run_fn: RunFn) -> Self {
140 Self {
141 name,
142 build_fn: Opaque(build_fn),
143 run_fn: Opaque(run_fn),
144 }
145 }
146 pub fn builder(&self) -> JobBuilder<'static> {
148 let mut builder = JobBuilder::new(self.name);
149 (self.build_fn.0 .0)(&mut builder);
150 builder
151 }
152 pub fn builder_with_id(&self, id: Uuid) -> JobBuilder<'static> {
155 let mut builder = JobBuilder::new_with_id(id, self.name);
156 (self.build_fn.0 .0)(&mut builder);
157 builder
158 }
159
160 pub const fn name(&self) -> &'static str {
162 self.name
163 }
164}