use crate::broker::{
broker_builder_from_url, build_and_connect, configure_task_routes, BrokerBuilder,
};
use crate::routing::{self, Rule};
use crate::{
error::{BeatError, BrokerError},
protocol::MessageContentType,
task::{Signature, Task, TaskOptions},
};
use log::{debug, error, info};
use std::time::SystemTime;
use tokio::time::{self, Duration};
mod scheduler;
pub use scheduler::Scheduler;
mod backend;
pub use backend::{LocalSchedulerBackend, SchedulerBackend};
mod schedule;
pub use schedule::{CronSchedule, DeltaSchedule, Schedule};
mod scheduled_task;
pub use scheduled_task::ScheduledTask;
struct Config {
name: String,
broker_builder: Box<dyn BrokerBuilder>,
broker_connection_timeout: u32,
broker_connection_retry: bool,
broker_connection_max_retries: u32,
broker_connection_retry_delay: u32,
default_queue: String,
task_routes: Vec<(String, String)>,
task_options: TaskOptions,
max_sleep_duration: Option<Duration>,
}
pub struct BeatBuilder<Sb>
where
Sb: SchedulerBackend,
{
config: Config,
scheduler_backend: Sb,
}
impl BeatBuilder<LocalSchedulerBackend> {
pub fn with_default_scheduler_backend(name: &str, broker_url: &str) -> Self {
Self {
config: Config {
name: name.into(),
broker_builder: broker_builder_from_url(broker_url),
broker_connection_timeout: 2,
broker_connection_retry: true,
broker_connection_max_retries: 5,
broker_connection_retry_delay: 5,
default_queue: "celery".into(),
task_routes: vec![],
task_options: TaskOptions::default(),
max_sleep_duration: None,
},
scheduler_backend: LocalSchedulerBackend::new(),
}
}
}
impl<Sb> BeatBuilder<Sb>
where
Sb: SchedulerBackend,
{
pub fn with_custom_scheduler_backend(
name: &str,
broker_url: &str,
scheduler_backend: Sb,
) -> Self {
Self {
config: Config {
name: name.into(),
broker_builder: broker_builder_from_url(broker_url),
broker_connection_timeout: 2,
broker_connection_retry: true,
broker_connection_max_retries: 5,
broker_connection_retry_delay: 5,
default_queue: "celery".into(),
task_routes: vec![],
task_options: TaskOptions::default(),
max_sleep_duration: None,
},
scheduler_backend,
}
}
pub fn default_queue(mut self, queue_name: &str) -> Self {
self.config.default_queue = queue_name.into();
self
}
pub fn heartbeat(mut self, heartbeat: Option<u16>) -> Self {
self.config.broker_builder = self.config.broker_builder.heartbeat(heartbeat);
self
}
pub fn task_route(mut self, pattern: &str, queue: &str) -> Self {
self.config.task_routes.push((pattern.into(), queue.into()));
self
}
pub fn broker_connection_timeout(mut self, timeout: u32) -> Self {
self.config.broker_connection_timeout = timeout;
self
}
pub fn broker_connection_retry(mut self, retry: bool) -> Self {
self.config.broker_connection_retry = retry;
self
}
pub fn broker_connection_max_retries(mut self, max_retries: u32) -> Self {
self.config.broker_connection_max_retries = max_retries;
self
}
pub fn broker_connection_retry_delay(mut self, retry_delay: u32) -> Self {
self.config.broker_connection_retry_delay = retry_delay;
self
}
pub fn task_content_type(mut self, content_type: MessageContentType) -> Self {
self.config.task_options.content_type = Some(content_type);
self
}
pub fn max_sleep_duration(mut self, max_sleep_duration: Duration) -> Self {
self.config.max_sleep_duration = Some(max_sleep_duration);
self
}
pub async fn build(self) -> Result<Beat<Sb>, BeatError> {
let broker_builder = self
.config
.broker_builder
.declare_queue(&self.config.default_queue);
let (broker_builder, task_routes) =
configure_task_routes(broker_builder, &self.config.task_routes)?;
let broker = build_and_connect(
broker_builder,
self.config.broker_connection_timeout,
if self.config.broker_connection_retry {
self.config.broker_connection_max_retries
} else {
0
},
self.config.broker_connection_retry_delay,
)
.await?;
let scheduler = Scheduler::new(broker);
Ok(Beat {
name: self.config.name,
scheduler,
scheduler_backend: self.scheduler_backend,
task_routes,
default_queue: self.config.default_queue,
task_options: self.config.task_options,
broker_connection_timeout: self.config.broker_connection_timeout,
broker_connection_retry: self.config.broker_connection_retry,
broker_connection_max_retries: self.config.broker_connection_max_retries,
broker_connection_retry_delay: self.config.broker_connection_retry_delay,
max_sleep_duration: self.config.max_sleep_duration,
})
}
}
pub struct Beat<Sb: SchedulerBackend> {
pub name: String,
pub scheduler: Scheduler,
pub scheduler_backend: Sb,
task_routes: Vec<Rule>,
default_queue: String,
task_options: TaskOptions,
broker_connection_timeout: u32,
broker_connection_retry: bool,
broker_connection_max_retries: u32,
broker_connection_retry_delay: u32,
max_sleep_duration: Option<Duration>,
}
impl Beat<LocalSchedulerBackend> {
pub fn default_builder(name: &str, broker_url: &str) -> BeatBuilder<LocalSchedulerBackend> {
BeatBuilder::<LocalSchedulerBackend>::with_default_scheduler_backend(name, broker_url)
}
}
impl<Sb> Beat<Sb>
where
Sb: SchedulerBackend,
{
pub fn custom_builder(name: &str, broker_url: &str, scheduler_backend: Sb) -> BeatBuilder<Sb> {
BeatBuilder::<Sb>::with_custom_scheduler_backend(name, broker_url, scheduler_backend)
}
pub fn schedule_task<T, S>(&mut self, signature: Signature<T>, schedule: S)
where
T: Task + Clone + 'static,
S: Schedule + 'static,
{
self.schedule_named_task(Signature::<T>::task_name().to_string(), signature, schedule);
}
pub fn schedule_named_task<T, S>(
&mut self,
name: String,
mut signature: Signature<T>,
schedule: S,
) where
T: Task + Clone + 'static,
S: Schedule + 'static,
{
signature.options.update(&self.task_options);
let queue = match &signature.queue {
Some(queue) => queue.to_string(),
None => routing::route(T::NAME, &self.task_routes)
.unwrap_or(&self.default_queue)
.to_string(),
};
let message_factory = Box::new(signature);
self.scheduler
.schedule_task(name, message_factory, queue, schedule);
}
pub async fn start(&mut self) -> Result<(), BeatError> {
info!("Starting beat service");
loop {
let result = self.beat_loop().await;
if !self.broker_connection_retry {
return result;
}
if let Err(err) = result {
match err {
BeatError::BrokerError(broker_err) => {
if broker_err.is_connection_error() {
error!("Broker connection failed");
} else {
return Err(BeatError::BrokerError(broker_err));
}
}
_ => return Err(err),
};
} else {
return result;
}
let mut reconnect_successful: bool = false;
for _ in 0..self.broker_connection_max_retries {
info!("Trying to re-establish connection with broker");
time::sleep(Duration::from_secs(
self.broker_connection_retry_delay as u64,
))
.await;
match self
.scheduler
.broker
.reconnect(self.broker_connection_timeout)
.await
{
Err(err) => {
if err.is_connection_error() {
continue;
}
return Err(BeatError::BrokerError(err));
}
Ok(_) => {
info!("Successfully reconnected with broker");
reconnect_successful = true;
break;
}
};
}
if !reconnect_successful {
return Err(BeatError::BrokerError(BrokerError::NotConnected));
}
}
}
async fn beat_loop(&mut self) -> Result<(), BeatError> {
loop {
let next_tick_at = self.scheduler.tick().await?;
if self.scheduler_backend.should_sync() {
self.scheduler_backend
.sync(self.scheduler.get_scheduled_tasks())?;
}
let now = SystemTime::now();
if now < next_tick_at {
let sleep_interval = next_tick_at.duration_since(now).expect(
"Unexpected error when unwrapping a SystemTime comparison that is not supposed to fail",
);
let sleep_interval = match &self.max_sleep_duration {
Some(max_sleep_duration) => std::cmp::min(sleep_interval, *max_sleep_duration),
None => sleep_interval,
};
debug!("Now sleeping for {:?}", sleep_interval);
time::sleep(sleep_interval).await;
}
}
}
}
#[cfg(test)]
mod tests;