use std::fmt;
use std::time::Duration;
#[derive(Debug)]
pub enum ElectionError {
LockNotAcquired,
Connection(String),
Timeout,
Aborted,
Config(String),
CircuitBreakerOpen,
}
impl fmt::Display for ElectionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ElectionError::LockNotAcquired => write!(f, "flagship lock not acquired"),
ElectionError::Connection(msg) => write!(f, "connection error: {}", msg),
ElectionError::Timeout => write!(f, "timeout waiting for flagship signal"),
ElectionError::Aborted => write!(f, "flagship signaled abort"),
ElectionError::Config(msg) => write!(f, "configuration error: {}", msg),
ElectionError::CircuitBreakerOpen => write!(
f,
"PostgreSQL election circuit breaker open - connection attempts suspended"
),
}
}
}
impl std::error::Error for ElectionError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlagshipSignal {
Running,
Ready,
Abort,
}
impl fmt::Display for FlagshipSignal {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FlagshipSignal::Running => write!(f, "running"),
FlagshipSignal::Ready => write!(f, "ready"),
FlagshipSignal::Abort => write!(f, "abort"),
}
}
}
pub trait Election: Send + Sync {
fn try_acquire(
&self,
app_name: &str,
) -> impl std::future::Future<Output = Result<bool, ElectionError>> + Send;
fn release(
&self,
app_name: &str,
) -> impl std::future::Future<Output = Result<(), ElectionError>> + Send;
fn signal(
&self,
app_name: &str,
status: FlagshipSignal,
) -> impl std::future::Future<Output = Result<(), ElectionError>> + Send;
fn wait_for_signal(
&self,
app_name: &str,
timeout: Duration,
) -> impl std::future::Future<Output = Result<FlagshipSignal, ElectionError>> + Send;
fn get_signal(
&self,
app_name: &str,
) -> impl std::future::Future<Output = Result<Option<FlagshipSignal>, ElectionError>> + Send;
}
pub struct StaticElection {
is_flagship: bool,
}
impl StaticElection {
pub fn new(is_flagship: bool) -> Self {
Self { is_flagship }
}
pub fn is_flagship(&self) -> bool {
self.is_flagship
}
}
impl Election for StaticElection {
async fn try_acquire(&self, _app_name: &str) -> Result<bool, ElectionError> {
Ok(self.is_flagship)
}
async fn release(&self, _app_name: &str) -> Result<(), ElectionError> {
Ok(())
}
async fn signal(&self, _app_name: &str, _status: FlagshipSignal) -> Result<(), ElectionError> {
Ok(())
}
async fn wait_for_signal(
&self,
_app_name: &str,
_timeout: Duration,
) -> Result<FlagshipSignal, ElectionError> {
if self.is_flagship {
Ok(FlagshipSignal::Ready)
} else {
Ok(FlagshipSignal::Ready)
}
}
async fn get_signal(&self, _app_name: &str) -> Result<Option<FlagshipSignal>, ElectionError> {
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_static_election_flagship() {
let election = StaticElection::new(true);
assert!(election.is_flagship());
assert!(election.try_acquire("test").await.unwrap());
}
#[tokio::test]
async fn test_static_election_escort() {
let election = StaticElection::new(false);
assert!(!election.is_flagship());
assert!(!election.try_acquire("test").await.unwrap());
}
#[tokio::test]
async fn test_static_election_wait() {
let election = StaticElection::new(false);
let signal = election
.wait_for_signal("test", Duration::from_secs(1))
.await
.unwrap();
assert_eq!(signal, FlagshipSignal::Ready);
}
}