use crate::sync::CoordinationBackend;
use crate::sync::LeaderElector;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
struct MockBackend {
locks: Arc<Mutex<HashMap<String, (Vec<u8>, u64)>>>,
}
impl MockBackend {
fn new() -> Self {
Self {
locks: Arc::new(Mutex::new(HashMap::new())),
}
}
}
#[async_trait::async_trait]
impl CoordinationBackend for MockBackend {
async fn publish(&self, _topic: &str, _payload: &[u8]) -> Result<(), String> {
Ok(())
}
async fn subscribe(&self, _topic: &str) -> Result<mpsc::Receiver<Vec<u8>>, String> {
let (_, rx) = mpsc::channel(1);
Ok(rx)
}
async fn set(&self, _key: &str, _value: &[u8]) -> Result<(), String> {
Ok(())
}
async fn get(&self, _key: &str) -> Result<Option<Vec<u8>>, String> {
Ok(None)
}
async fn cas(&self, _key: &str, _old: Option<&[u8]>, _new: &[u8]) -> Result<bool, String> {
Ok(true)
}
async fn acquire_lock(&self, key: &str, value: &[u8], ttl_ms: u64) -> Result<bool, String> {
let mut locks = self.locks.lock().unwrap();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
if let Some((_, exp)) = locks.get(key)
&& now < *exp
{
return Ok(false);
}
locks.insert(key.to_string(), (value.to_vec(), now + ttl_ms));
Ok(true)
}
async fn renew_lock(&self, key: &str, value: &[u8], ttl_ms: u64) -> Result<bool, String> {
let mut locks = self.locks.lock().unwrap();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
if let Some((owner, exp)) = locks.get_mut(key)
&& owner == value
{
*exp = now + ttl_ms;
return Ok(true);
}
Ok(false)
}
}
#[tokio::test]
async fn test_leader_election_lifecycle() {
let backend = Arc::new(MockBackend::new());
let (elector, mut rx) = LeaderElector::new(Some(backend.clone()), "test_lock".to_string(), 500);
let elector_clone = elector.clone();
tokio::spawn(async move {
elector_clone.start().await;
});
if !*rx.borrow() {
rx.changed().await.unwrap();
}
assert!(*rx.borrow(), "Should become leader initially");
{
let mut locks = backend.locks.lock().unwrap();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
locks.insert(
"test_lock".to_string(),
(b"other_node".to_vec(), now + 10000),
);
}
tokio::time::sleep(tokio::time::Duration::from_millis(600)).await;
assert!(
!elector.is_leader(),
"Should lose leadership after lock theft"
);
{
let mut locks = backend.locks.lock().unwrap();
locks.remove("test_lock");
}
tokio::time::sleep(tokio::time::Duration::from_millis(600)).await;
assert!(
elector.is_leader(),
"Should regain leadership after lock release"
);
}