use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use sqlx::SqlitePool;
#[derive(Debug, Clone)]
pub struct LeaderConfig {
pub renew_every: Duration,
pub expires_after: Duration,
}
impl Default for LeaderConfig {
fn default() -> Self {
Self {
renew_every: Duration::from_secs(30),
expires_after: Duration::from_secs(60),
}
}
}
#[derive(Clone)]
pub struct LeaderElection {
pool: Arc<SqlitePool>,
holder: String,
cfg: LeaderConfig,
}
impl LeaderElection {
pub async fn new(pool: Arc<SqlitePool>, cfg: LeaderConfig) -> anyhow::Result<Self> {
let holder = ulid::Ulid::new().to_string();
Ok(Self { pool, holder, cfg })
}
fn now_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("horloge système valide (post-epoch)")
.as_millis() as i64
}
pub async fn try_acquire(&self) -> anyhow::Result<bool> {
let now = Self::now_ms();
let expires = now + self.cfg.expires_after.as_millis() as i64;
let r = sqlx::query(
"INSERT OR IGNORE INTO worker_leadership (slot, holder, expires_at) VALUES (0, ?, ?)",
)
.bind(&self.holder)
.bind(expires)
.execute(self.pool.as_ref())
.await?;
if r.rows_affected() == 1 {
return Ok(true);
}
let r = sqlx::query(
"UPDATE worker_leadership
SET holder = ?, expires_at = ?
WHERE slot = 0 AND (expires_at < ? OR holder = ?)",
)
.bind(&self.holder)
.bind(expires)
.bind(now)
.bind(&self.holder)
.execute(self.pool.as_ref())
.await?;
Ok(r.rows_affected() == 1)
}
pub async fn renew(&self) -> anyhow::Result<bool> {
let now = Self::now_ms();
let expires = now + self.cfg.expires_after.as_millis() as i64;
let r = sqlx::query(
"UPDATE worker_leadership SET expires_at = ? WHERE slot = 0 AND holder = ?",
)
.bind(expires)
.bind(&self.holder)
.execute(self.pool.as_ref())
.await?;
Ok(r.rows_affected() == 1)
}
pub async fn release(&self) -> anyhow::Result<()> {
sqlx::query("DELETE FROM worker_leadership WHERE holder = ?")
.bind(&self.holder)
.execute(self.pool.as_ref())
.await?;
Ok(())
}
pub fn spawn_renewal(self) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(self.cfg.renew_every);
interval.tick().await;
loop {
interval.tick().await;
match self.renew().await {
Ok(true) => {
tracing::debug!(holder = %self.holder, "leadership renouvelé");
}
Ok(false) => {
tracing::warn!(
holder = %self.holder,
"leadership perdu — arrêt de la boucle de renouvellement"
);
break;
}
Err(e) => {
tracing::error!(
holder = %self.holder,
error = %e,
"erreur lors du renouvellement du leadership"
);
break;
}
}
}
})
}
}