use std::future::Future;
use std::pin::Pin;
#[cfg(any(not(feature = "k8s"), test))]
use std::sync::Arc;
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
#[cfg(feature = "k8s")]
pub mod k8s_lease;
pub type LeaderWorkFn = Box<
dyn Fn(CancellationToken) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
+ Send
+ Sync,
>;
pub fn work_fn<F, Fut>(f: F) -> LeaderWorkFn
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{
Box::new(move |cancel| Box::pin(f(cancel)))
}
#[async_trait]
pub trait LeaderElector: Send + Sync + std::fmt::Debug {
async fn run_role(
&self,
role: &str,
cancel: CancellationToken,
work: LeaderWorkFn,
) -> anyhow::Result<()>;
}
#[derive(Debug)]
#[cfg(any(not(feature = "k8s"), test))]
pub struct NoopLeaderElector;
#[cfg(any(not(feature = "k8s"), test))]
#[async_trait]
impl LeaderElector for NoopLeaderElector {
async fn run_role(
&self,
_role: &str,
cancel: CancellationToken,
work: LeaderWorkFn,
) -> anyhow::Result<()> {
work(cancel).await
}
}
#[cfg(any(not(feature = "k8s"), test))]
#[must_use]
pub fn noop() -> Arc<dyn LeaderElector> {
Arc::new(NoopLeaderElector)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, Ordering};
#[tokio::test]
async fn noop_runs_work_directly() {
let executed = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&executed);
let elector = NoopLeaderElector;
let cancel = CancellationToken::new();
let c = cancel.clone();
let result = tokio::spawn(async move {
elector
.run_role(
"test",
c,
work_fn(move |_cancel| {
let f = Arc::clone(&flag);
async move {
f.store(true, Ordering::SeqCst);
Ok(())
}
}),
)
.await
})
.await;
assert!(result.is_ok());
assert!(executed.load(Ordering::SeqCst));
}
#[tokio::test]
async fn noop_respects_cancellation() {
let elector = NoopLeaderElector;
let cancel = CancellationToken::new();
let c = cancel.clone();
let handle = tokio::spawn(async move {
elector
.run_role(
"test",
c,
work_fn(|cancel| async move {
cancel.cancelled().await;
Ok(())
}),
)
.await
});
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
cancel.cancel();
let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
assert!(result.is_ok());
}
}