1use crate::broker::{
25 broker_builder_from_url, build_and_connect, configure_task_routes, BrokerBuilder,
26};
27use crate::routing::{self, Rule};
28use crate::{
29 error::{BeatError, BrokerError},
30 protocol::MessageContentType,
31 task::{Signature, Task, TaskOptions},
32};
33use log::{debug, error, info};
34use std::time::SystemTime;
35use tokio::time::{self, Duration};
36
37mod scheduler;
38pub use scheduler::Scheduler;
39
40mod backend;
41pub use backend::{
42 LocalSchedulerBackend, RedisBackendConfig, RedisSchedulerBackend, SchedulerBackend,
43};
44
45mod schedule;
46pub use schedule::{CronSchedule, DeltaSchedule, Schedule, ScheduleDescriptor};
47
48mod scheduled_task;
49pub use scheduled_task::ScheduledTask;
50
51struct Config {
52 name: String,
53 broker_builder: Box<dyn BrokerBuilder>,
54 broker_connection_timeout: u32,
55 broker_connection_retry: bool,
56 broker_connection_max_retries: u32,
57 broker_connection_retry_delay: u32,
58 default_queue: String,
59 task_routes: Vec<(String, String)>,
60 task_options: TaskOptions,
61 max_sleep_duration: Option<Duration>,
62}
63
64pub struct BeatBuilder<Sb>
75where
76 Sb: SchedulerBackend,
77{
78 config: Config,
79 scheduler_backend: Sb,
80}
81
82impl BeatBuilder<LocalSchedulerBackend> {
83 pub fn with_default_scheduler_backend(name: &str, broker_url: &str) -> Self {
86 Self {
87 config: Config {
88 name: name.into(),
89 broker_builder: broker_builder_from_url(broker_url),
90 broker_connection_timeout: 2,
91 broker_connection_retry: true,
92 broker_connection_max_retries: 5,
93 broker_connection_retry_delay: 5,
94 default_queue: "celery".into(),
95 task_routes: vec![],
96 task_options: TaskOptions::default(),
97 max_sleep_duration: None,
98 },
99 scheduler_backend: LocalSchedulerBackend::new(),
100 }
101 }
102}
103
104impl<Sb> BeatBuilder<Sb>
105where
106 Sb: SchedulerBackend,
107{
108 pub fn with_custom_scheduler_backend(
111 name: &str,
112 broker_url: &str,
113 scheduler_backend: Sb,
114 ) -> Self {
115 Self {
116 config: Config {
117 name: name.into(),
118 broker_builder: broker_builder_from_url(broker_url),
119 broker_connection_timeout: 2,
120 broker_connection_retry: true,
121 broker_connection_max_retries: 5,
122 broker_connection_retry_delay: 5,
123 default_queue: "celery".into(),
124 task_routes: vec![],
125 task_options: TaskOptions::default(),
126 max_sleep_duration: None,
127 },
128 scheduler_backend,
129 }
130 }
131
132 pub fn default_queue(mut self, queue_name: &str) -> Self {
134 self.config.default_queue = queue_name.into();
135 self
136 }
137
138 pub fn heartbeat(mut self, heartbeat: Option<u16>) -> Self {
140 self.config.broker_builder = self.config.broker_builder.heartbeat(heartbeat);
141 self
142 }
143
144 pub fn task_route(mut self, pattern: &str, queue: &str) -> Self {
146 self.config.task_routes.push((pattern.into(), queue.into()));
147 self
148 }
149
150 pub fn broker_connection_timeout(mut self, timeout: u32) -> Self {
152 self.config.broker_connection_timeout = timeout;
153 self
154 }
155
156 pub fn broker_connection_retry(mut self, retry: bool) -> Self {
158 self.config.broker_connection_retry = retry;
159 self
160 }
161
162 pub fn broker_connection_max_retries(mut self, max_retries: u32) -> Self {
165 self.config.broker_connection_max_retries = max_retries;
166 self
167 }
168
169 pub fn broker_connection_retry_delay(mut self, retry_delay: u32) -> Self {
171 self.config.broker_connection_retry_delay = retry_delay;
172 self
173 }
174
175 pub fn task_content_type(mut self, content_type: MessageContentType) -> Self {
177 self.config.task_options.content_type = Some(content_type);
178 self
179 }
180
181 pub fn max_sleep_duration(mut self, max_sleep_duration: Duration) -> Self {
185 self.config.max_sleep_duration = Some(max_sleep_duration);
186 self
187 }
188
189 pub async fn build(self) -> Result<Beat<Sb>, BeatError> {
191 let broker_builder = self
193 .config
194 .broker_builder
195 .declare_queue(&self.config.default_queue);
196
197 let (broker_builder, task_routes) =
198 configure_task_routes(broker_builder, &self.config.task_routes)?;
199
200 let broker = build_and_connect(
201 broker_builder,
202 self.config.broker_connection_timeout,
203 if self.config.broker_connection_retry {
204 self.config.broker_connection_max_retries
205 } else {
206 0
207 },
208 self.config.broker_connection_retry_delay,
209 )
210 .await?;
211
212 let scheduler = Scheduler::new(broker);
213
214 Ok(Beat {
215 name: self.config.name,
216 scheduler,
217 scheduler_backend: self.scheduler_backend,
218 task_routes,
219 default_queue: self.config.default_queue,
220 task_options: self.config.task_options,
221 broker_connection_timeout: self.config.broker_connection_timeout,
222 broker_connection_retry: self.config.broker_connection_retry,
223 broker_connection_max_retries: self.config.broker_connection_max_retries,
224 broker_connection_retry_delay: self.config.broker_connection_retry_delay,
225 max_sleep_duration: self.config.max_sleep_duration,
226 })
227 }
228}
229
230pub struct Beat<Sb: SchedulerBackend> {
236 pub name: String,
237 pub scheduler: Scheduler,
238 pub scheduler_backend: Sb,
239
240 task_routes: Vec<Rule>,
241 default_queue: String,
242 task_options: TaskOptions,
243
244 broker_connection_timeout: u32,
245 broker_connection_retry: bool,
246 broker_connection_max_retries: u32,
247 broker_connection_retry_delay: u32,
248
249 max_sleep_duration: Option<Duration>,
250}
251
252impl Beat<LocalSchedulerBackend> {
253 pub fn default_builder(name: &str, broker_url: &str) -> BeatBuilder<LocalSchedulerBackend> {
256 BeatBuilder::<LocalSchedulerBackend>::with_default_scheduler_backend(name, broker_url)
257 }
258}
259
260impl<Sb> Beat<Sb>
261where
262 Sb: SchedulerBackend,
263{
264 pub fn custom_builder(name: &str, broker_url: &str, scheduler_backend: Sb) -> BeatBuilder<Sb> {
267 BeatBuilder::<Sb>::with_custom_scheduler_backend(name, broker_url, scheduler_backend)
268 }
269
270 pub fn schedule_task<T, S>(&mut self, signature: Signature<T>, schedule: S)
272 where
273 T: Task + Clone + 'static,
274 S: Schedule + 'static,
275 {
276 self.schedule_named_task(Signature::<T>::task_name().to_string(), signature, schedule);
277 }
278
279 pub fn schedule_named_task<T, S>(
281 &mut self,
282 name: String,
283 mut signature: Signature<T>,
284 schedule: S,
285 ) where
286 T: Task + Clone + 'static,
287 S: Schedule + 'static,
288 {
289 signature.options.update(&self.task_options);
290 let queue = match &signature.queue {
291 Some(queue) => queue.to_string(),
292 None => routing::route(T::NAME, &self.task_routes)
293 .unwrap_or(&self.default_queue)
294 .to_string(),
295 };
296 let message_factory = Box::new(signature);
297
298 self.scheduler
299 .schedule_task(name, message_factory, queue, schedule);
300 }
301
302 pub async fn start(&mut self) -> Result<(), BeatError> {
304 info!("Starting beat service");
305 loop {
306 let result = self.beat_loop().await;
307 if !self.broker_connection_retry {
308 return result;
309 }
310
311 if let Err(err) = result {
312 match err {
313 BeatError::BrokerError(broker_err) => {
314 if broker_err.is_connection_error() {
315 error!("Broker connection failed");
316 } else {
317 return Err(BeatError::BrokerError(broker_err));
318 }
319 }
320 _ => return Err(err),
321 };
322 } else {
323 return result;
324 }
325
326 let mut reconnect_successful: bool = false;
327 for _ in 0..self.broker_connection_max_retries {
328 info!("Trying to re-establish connection with broker");
329 time::sleep(Duration::from_secs(
330 self.broker_connection_retry_delay as u64,
331 ))
332 .await;
333
334 match self
335 .scheduler
336 .broker
337 .reconnect(self.broker_connection_timeout)
338 .await
339 {
340 Err(err) => {
341 if err.is_connection_error() {
342 continue;
343 }
344 return Err(BeatError::BrokerError(err));
345 }
346 Ok(_) => {
347 info!("Successfully reconnected with broker");
348 reconnect_successful = true;
349 break;
350 }
351 };
352 }
353
354 if !reconnect_successful {
355 return Err(BeatError::BrokerError(BrokerError::NotConnected));
356 }
357 }
358 }
359
360 async fn beat_loop(&mut self) -> Result<(), BeatError> {
361 loop {
362 let mut sleep_hint = None;
363 let execute_tasks = {
364 if let Some(distributed) = self.scheduler_backend.as_distributed() {
365 let decision = distributed.before_tick().await?;
366 sleep_hint = decision.sleep_hint;
367 decision.execute_tasks
368 } else {
369 true
370 }
371 };
372
373 let next_tick_at = if execute_tasks {
374 self.scheduler.tick().await?
375 } else {
376 let fallback = sleep_hint.unwrap_or_else(|| Duration::from_secs(1));
377 SystemTime::now() + fallback
378 };
379
380 let used_distributed =
381 if let Some(distributed) = self.scheduler_backend.as_distributed() {
382 distributed
383 .after_tick(self.scheduler.get_scheduled_tasks())
384 .await?;
385 true
386 } else {
387 false
388 };
389
390 if !used_distributed && self.scheduler_backend.should_sync() {
391 self.scheduler_backend
392 .sync(self.scheduler.get_scheduled_tasks())?;
393 }
394
395 let now = SystemTime::now();
396 let mut sleep_interval = if now < next_tick_at {
397 next_tick_at.duration_since(now).expect(
398 "Unexpected error when unwrapping a SystemTime comparison that is not supposed to fail",
399 )
400 } else {
401 Duration::from_millis(0)
402 };
403
404 if !execute_tasks {
405 if let Some(hint) = sleep_hint {
406 sleep_interval = hint;
407 }
408 } else if let Some(hint) = sleep_hint {
409 sleep_interval = std::cmp::min(sleep_interval, hint);
410 }
411
412 if let Some(max_sleep_duration) = &self.max_sleep_duration {
413 sleep_interval = std::cmp::min(sleep_interval, *max_sleep_duration);
414 }
415
416 if sleep_interval > Duration::from_millis(0) {
417 debug!("Now sleeping for {:?}", sleep_interval);
418 time::sleep(sleep_interval).await;
419 }
420 }
421 }
422}
423
424#[cfg(test)]
425mod tests;