use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc::Sender,
Arc,
};
use std::thread;
use std::time::{Duration, Instant};
use super::error::PacemakerStartError;
pub struct PacemakerBuilder<M, F>
where
M: Send + 'static,
F: Fn() -> M + Send + 'static,
{
interval: Option<u64>,
sender: Option<Sender<M>>,
message_factory: Option<F>,
}
impl<M, F> PacemakerBuilder<M, F>
where
M: Send + 'static,
F: Fn() -> M + Send + 'static,
{
pub fn new() -> Self {
Self {
interval: None,
sender: None,
message_factory: None,
}
}
pub fn with_interval(mut self, interval: u64) -> Self {
self.interval = Some(interval);
self
}
pub fn with_sender(mut self, sender: Sender<M>) -> Self {
self.sender = Some(sender);
self
}
pub fn with_message_factory(mut self, message_factory: F) -> Self {
self.message_factory = Some(message_factory);
self
}
pub fn start(mut self) -> Result<Pacemaker, PacemakerStartError> {
let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
let interval = self
.interval
.take()
.ok_or_else(|| PacemakerStartError("No interval provided".into()))?;
let sender = self
.sender
.take()
.ok_or_else(|| PacemakerStartError("No sender provided".into()))?;
let new_message = self
.message_factory
.take()
.ok_or_else(|| PacemakerStartError("No message factory function provided".into()))?;
let join_handle = thread::Builder::new()
.name("Pacemaker".into())
.spawn(move || {
let mut start = Instant::now();
let loop_duration = Duration::from_secs(1);
let pace_duration = Duration::from_secs(interval);
while running_clone.load(Ordering::SeqCst) {
if start.elapsed() >= pace_duration {
start = Instant::now();
if let Err(err) = sender.send(new_message()) {
warn!(
"Sender has disconnected before \
shutting down pacemaker {:?}",
err
);
break;
}
}
thread::sleep(loop_duration);
}
})
.map_err(|err| PacemakerStartError(err.to_string()))?;
Ok(Pacemaker {
join_handle,
shutdown_signaler: ShutdownSignaler { running },
})
}
}
pub struct Pacemaker {
join_handle: thread::JoinHandle<()>,
shutdown_signaler: ShutdownSignaler,
}
impl Pacemaker {
pub fn builder<M, F>() -> PacemakerBuilder<M, F>
where
M: Send + 'static,
F: Fn() -> M + Send + 'static,
{
PacemakerBuilder::new()
}
pub fn shutdown_signaler(&self) -> ShutdownSignaler {
self.shutdown_signaler.clone()
}
pub fn await_shutdown(self) {
if let Err(err) = self.join_handle.join() {
error!("Failed to shutdown heartbeat monitor gracefully: {:?}", err);
}
}
}
#[derive(Clone)]
pub struct ShutdownSignaler {
running: Arc<AtomicBool>,
}
impl ShutdownSignaler {
pub fn shutdown(&self) {
self.running.store(false, Ordering::SeqCst)
}
}