1use crate::broker::{
25 broker_builder_from_url, build_and_connect, configure_task_routes, BrokerBuilder,
26};
27use crate::routing::{self, Rule};
28use crate::{
29 error::{BeatError, BrokerError},
30 protocol::MessageContentType,
31 task::{Signature, Task, TaskOptions},
32};
33use log::{debug, error, info};
34use std::time::SystemTime;
35use tokio::time::{self, Duration};
36
37mod scheduler;
38pub use scheduler::Scheduler;
39
40mod backend;
41pub use backend::{LocalSchedulerBackend, SchedulerBackend};
42
43mod schedule;
44pub use schedule::{CronSchedule, DeltaSchedule, Schedule};
45
46mod scheduled_task;
47pub use scheduled_task::ScheduledTask;
48
49struct Config {
50 name: String,
51 broker_builder: Box<dyn BrokerBuilder>,
52 broker_connection_timeout: u32,
53 broker_connection_retry: bool,
54 broker_connection_max_retries: u32,
55 broker_connection_retry_delay: u32,
56 default_queue: String,
57 task_routes: Vec<(String, String)>,
58 task_options: TaskOptions,
59 max_sleep_duration: Option<Duration>,
60}
61
62pub struct BeatBuilder<Sb>
64where
65 Sb: SchedulerBackend,
66{
67 config: Config,
68 scheduler_backend: Sb,
69}
70
71impl BeatBuilder<LocalSchedulerBackend> {
72 pub fn with_default_scheduler_backend(name: &str, broker_url: &str) -> Self {
75 Self {
76 config: Config {
77 name: name.into(),
78 broker_builder: broker_builder_from_url(broker_url),
79 broker_connection_timeout: 2,
80 broker_connection_retry: true,
81 broker_connection_max_retries: 5,
82 broker_connection_retry_delay: 5,
83 default_queue: "celery".into(),
84 task_routes: vec![],
85 task_options: TaskOptions::default(),
86 max_sleep_duration: None,
87 },
88 scheduler_backend: LocalSchedulerBackend::new(),
89 }
90 }
91}
92
93impl<Sb> BeatBuilder<Sb>
94where
95 Sb: SchedulerBackend,
96{
97 pub fn with_custom_scheduler_backend(
100 name: &str,
101 broker_url: &str,
102 scheduler_backend: Sb,
103 ) -> Self {
104 Self {
105 config: Config {
106 name: name.into(),
107 broker_builder: broker_builder_from_url(broker_url),
108 broker_connection_timeout: 2,
109 broker_connection_retry: true,
110 broker_connection_max_retries: 5,
111 broker_connection_retry_delay: 5,
112 default_queue: "celery".into(),
113 task_routes: vec![],
114 task_options: TaskOptions::default(),
115 max_sleep_duration: None,
116 },
117 scheduler_backend,
118 }
119 }
120
121 pub fn default_queue(mut self, queue_name: &str) -> Self {
123 self.config.default_queue = queue_name.into();
124 self
125 }
126
127 pub fn heartbeat(mut self, heartbeat: Option<u16>) -> Self {
129 self.config.broker_builder = self.config.broker_builder.heartbeat(heartbeat);
130 self
131 }
132
133 pub fn task_route(mut self, pattern: &str, queue: &str) -> Self {
135 self.config.task_routes.push((pattern.into(), queue.into()));
136 self
137 }
138
139 pub fn broker_connection_timeout(mut self, timeout: u32) -> Self {
141 self.config.broker_connection_timeout = timeout;
142 self
143 }
144
145 pub fn broker_connection_retry(mut self, retry: bool) -> Self {
147 self.config.broker_connection_retry = retry;
148 self
149 }
150
151 pub fn broker_connection_max_retries(mut self, max_retries: u32) -> Self {
154 self.config.broker_connection_max_retries = max_retries;
155 self
156 }
157
158 pub fn broker_connection_retry_delay(mut self, retry_delay: u32) -> Self {
160 self.config.broker_connection_retry_delay = retry_delay;
161 self
162 }
163
164 pub fn task_content_type(mut self, content_type: MessageContentType) -> Self {
166 self.config.task_options.content_type = Some(content_type);
167 self
168 }
169
170 pub fn max_sleep_duration(mut self, max_sleep_duration: Duration) -> Self {
174 self.config.max_sleep_duration = Some(max_sleep_duration);
175 self
176 }
177
178 pub async fn build(self) -> Result<Beat<Sb>, BeatError> {
180 let broker_builder = self
182 .config
183 .broker_builder
184 .declare_queue(&self.config.default_queue);
185
186 let (broker_builder, task_routes) =
187 configure_task_routes(broker_builder, &self.config.task_routes)?;
188
189 let broker = build_and_connect(
190 broker_builder,
191 self.config.broker_connection_timeout,
192 if self.config.broker_connection_retry {
193 self.config.broker_connection_max_retries
194 } else {
195 0
196 },
197 self.config.broker_connection_retry_delay,
198 )
199 .await?;
200
201 let scheduler = Scheduler::new(broker);
202
203 Ok(Beat {
204 name: self.config.name,
205 scheduler,
206 scheduler_backend: self.scheduler_backend,
207 task_routes,
208 default_queue: self.config.default_queue,
209 task_options: self.config.task_options,
210 broker_connection_timeout: self.config.broker_connection_timeout,
211 broker_connection_retry: self.config.broker_connection_retry,
212 broker_connection_max_retries: self.config.broker_connection_max_retries,
213 broker_connection_retry_delay: self.config.broker_connection_retry_delay,
214 max_sleep_duration: self.config.max_sleep_duration,
215 })
216 }
217}
218
219pub struct Beat<Sb: SchedulerBackend> {
225 pub name: String,
226 pub scheduler: Scheduler,
227 pub scheduler_backend: Sb,
228
229 task_routes: Vec<Rule>,
230 default_queue: String,
231 task_options: TaskOptions,
232
233 broker_connection_timeout: u32,
234 broker_connection_retry: bool,
235 broker_connection_max_retries: u32,
236 broker_connection_retry_delay: u32,
237
238 max_sleep_duration: Option<Duration>,
239}
240
241impl Beat<LocalSchedulerBackend> {
242 pub fn default_builder(name: &str, broker_url: &str) -> BeatBuilder<LocalSchedulerBackend> {
245 BeatBuilder::<LocalSchedulerBackend>::with_default_scheduler_backend(name, broker_url)
246 }
247}
248
249impl<Sb> Beat<Sb>
250where
251 Sb: SchedulerBackend,
252{
253 pub fn custom_builder(name: &str, broker_url: &str, scheduler_backend: Sb) -> BeatBuilder<Sb> {
256 BeatBuilder::<Sb>::with_custom_scheduler_backend(name, broker_url, scheduler_backend)
257 }
258
259 pub fn schedule_task<T, S>(&mut self, signature: Signature<T>, schedule: S)
261 where
262 T: Task + Clone + 'static,
263 S: Schedule + 'static,
264 {
265 self.schedule_named_task(Signature::<T>::task_name().to_string(), signature, schedule);
266 }
267
268 pub fn schedule_named_task<T, S>(
270 &mut self,
271 name: String,
272 mut signature: Signature<T>,
273 schedule: S,
274 ) where
275 T: Task + Clone + 'static,
276 S: Schedule + 'static,
277 {
278 signature.options.update(&self.task_options);
279 let queue = match &signature.queue {
280 Some(queue) => queue.to_string(),
281 None => routing::route(T::NAME, &self.task_routes)
282 .unwrap_or(&self.default_queue)
283 .to_string(),
284 };
285 let message_factory = Box::new(signature);
286
287 self.scheduler
288 .schedule_task(name, message_factory, queue, schedule);
289 }
290
291 pub async fn start(&mut self) -> Result<(), BeatError> {
293 info!("Starting beat service");
294 loop {
295 let result = self.beat_loop().await;
296 if !self.broker_connection_retry {
297 return result;
298 }
299
300 if let Err(err) = result {
301 match err {
302 BeatError::BrokerError(broker_err) => {
303 if broker_err.is_connection_error() {
304 error!("Broker connection failed");
305 } else {
306 return Err(BeatError::BrokerError(broker_err));
307 }
308 }
309 _ => return Err(err),
310 };
311 } else {
312 return result;
313 }
314
315 let mut reconnect_successful: bool = false;
316 for _ in 0..self.broker_connection_max_retries {
317 info!("Trying to re-establish connection with broker");
318 time::sleep(Duration::from_secs(
319 self.broker_connection_retry_delay as u64,
320 ))
321 .await;
322
323 match self
324 .scheduler
325 .broker
326 .reconnect(self.broker_connection_timeout)
327 .await
328 {
329 Err(err) => {
330 if err.is_connection_error() {
331 continue;
332 }
333 return Err(BeatError::BrokerError(err));
334 }
335 Ok(_) => {
336 info!("Successfully reconnected with broker");
337 reconnect_successful = true;
338 break;
339 }
340 };
341 }
342
343 if !reconnect_successful {
344 return Err(BeatError::BrokerError(BrokerError::NotConnected));
345 }
346 }
347 }
348
349 async fn beat_loop(&mut self) -> Result<(), BeatError> {
350 loop {
351 let next_tick_at = self.scheduler.tick().await?;
352
353 if self.scheduler_backend.should_sync() {
354 self.scheduler_backend
355 .sync(self.scheduler.get_scheduled_tasks())?;
356 }
357
358 let now = SystemTime::now();
359 if now < next_tick_at {
360 let sleep_interval = next_tick_at.duration_since(now).expect(
361 "Unexpected error when unwrapping a SystemTime comparison that is not supposed to fail",
362 );
363 let sleep_interval = match &self.max_sleep_duration {
364 Some(max_sleep_duration) => std::cmp::min(sleep_interval, *max_sleep_duration),
365 None => sleep_interval,
366 };
367 debug!("Now sleeping for {:?}", sleep_interval);
368 time::sleep(sleep_interval).await;
369 }
370 }
371 }
372}
373
374#[cfg(test)]
375mod tests;