use anyhow::Result;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
use uuid::Uuid;
#[async_trait]
pub trait DistributedLock: Send + Sync {
async fn acquire(&self, key: &str, ttl: Duration) -> Result<LockHandle>;
async fn try_acquire(&self, key: &str, ttl: Duration) -> Result<Option<LockHandle>>;
async fn extend(&self, handle: &LockHandle, ttl: Duration) -> Result<bool>;
async fn release(&self, handle: &LockHandle) -> Result<bool>;
async fn is_locked(&self, key: &str) -> Result<bool>;
async fn cleanup_expired_locks(&self) -> Result<usize>;
}
#[derive(Debug, Clone)]
pub struct LockHandle {
pub key: String,
pub token: Uuid,
pub acquired_at: chrono::DateTime<chrono::Utc>,
pub ttl: Duration,
}
impl LockHandle {
pub fn new(key: String, ttl: Duration) -> Self {
Self {
key,
token: Uuid::new_v4(),
acquired_at: chrono::Utc::now(),
ttl,
}
}
pub fn is_expired(&self) -> bool {
let elapsed = chrono::Utc::now()
.signed_duration_since(self.acquired_at)
.to_std()
.unwrap_or(Duration::ZERO);
elapsed > self.ttl
}
}
pub struct InMemoryDistributedLock {
locks: Arc<RwLock<HashMap<String, LockEntry>>>,
}
#[derive(Debug, Clone)]
struct LockEntry {
token: Uuid,
#[allow(dead_code)]
instance_id: Uuid,
expires_at: chrono::DateTime<chrono::Utc>,
}
impl InMemoryDistributedLock {
pub fn new() -> Self {
Self {
locks: Arc::new(RwLock::new(HashMap::new())),
}
}
async fn cleanup_expired(&self) -> usize {
let mut locks = self.locks.write().await;
let now = chrono::Utc::now();
let initial_count = locks.len();
locks.retain(|_, entry| entry.expires_at > now);
initial_count - locks.len()
}
}
impl Default for InMemoryDistributedLock {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl DistributedLock for InMemoryDistributedLock {
async fn acquire(&self, key: &str, ttl: Duration) -> Result<LockHandle> {
let executor = crate::retry::RetryExecutor::new(crate::retry::RetryConfig::persistent());
executor
.execute_with_condition(
"in_memory_lock_acquire",
|| {
let key = key.to_string();
async move {
match self.try_acquire(&key, ttl).await? {
Some(handle) => Ok(handle),
None => Err(anyhow::anyhow!("lock busy: {key}")),
}
}
},
|error| error.to_string().to_lowercase().contains("lock busy"),
)
.await
}
async fn try_acquire(&self, key: &str, ttl: Duration) -> Result<Option<LockHandle>> {
let handle = LockHandle::new(key.to_string(), ttl);
let expires_at = handle.acquired_at + chrono::Duration::from_std(ttl)?;
let mut locks = self.locks.write().await;
let now = chrono::Utc::now();
locks.retain(|_, entry| entry.expires_at > now);
if locks.contains_key(key) {
return Ok(None);
}
locks.insert(
key.to_string(),
LockEntry {
token: handle.token,
instance_id: Uuid::new_v4(), expires_at,
},
);
Ok(Some(handle))
}
async fn extend(&self, handle: &LockHandle, ttl: Duration) -> Result<bool> {
let new_expires_at = chrono::Utc::now() + chrono::Duration::from_std(ttl)?;
let mut locks = self.locks.write().await;
let now = chrono::Utc::now();
locks.retain(|_, entry| entry.expires_at > now);
if let Some(entry) = locks.get_mut(&handle.key) {
if entry.token == handle.token {
entry.expires_at = new_expires_at;
return Ok(true);
}
}
Ok(false)
}
async fn release(&self, handle: &LockHandle) -> Result<bool> {
let mut locks = self.locks.write().await;
if let Some(entry) = locks.get(&handle.key) {
if entry.token == handle.token {
locks.remove(&handle.key);
return Ok(true);
}
}
Ok(false)
}
async fn is_locked(&self, key: &str) -> Result<bool> {
let locks = self.locks.read().await;
let now = chrono::Utc::now();
Ok(locks.get(key).is_some_and(|entry| entry.expires_at > now))
}
async fn cleanup_expired_locks(&self) -> Result<usize> {
Ok(self.cleanup_expired().await)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SequencedMessage {
pub sequence_id: u64,
pub instance_id: Uuid,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub message: crate::protocol::ServerMessage,
pub room_id: Option<crate::protocol::RoomId>,
pub target_player: Option<crate::protocol::PlayerId>,
#[serde(default)]
pub excluded_players: Vec<crate::protocol::PlayerId>,
}
impl SequencedMessage {
pub fn new(
sequence_id: u64,
instance_id: Uuid,
message: crate::protocol::ServerMessage,
room_id: Option<crate::protocol::RoomId>,
target_player: Option<crate::protocol::PlayerId>,
excluded_players: Vec<crate::protocol::PlayerId>,
) -> Self {
Self {
sequence_id,
instance_id,
timestamp: chrono::Utc::now(),
message,
room_id,
target_player,
excluded_players,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CircuitState {
Closed,
Open,
HalfOpen,
}
struct CircuitBreakerInner {
state: CircuitState,
failure_count: u32,
last_failure_time: Option<chrono::DateTime<chrono::Utc>>,
}
pub struct CircuitBreaker {
inner: Arc<Mutex<CircuitBreakerInner>>,
failure_threshold: u32,
timeout: Duration,
}
impl CircuitBreaker {
pub fn new(failure_threshold: u32, timeout: Duration) -> Self {
Self {
inner: Arc::new(Mutex::new(CircuitBreakerInner {
state: CircuitState::Closed,
failure_count: 0,
last_failure_time: None,
})),
failure_threshold,
timeout,
}
}
pub async fn call<F, T, E>(&self, operation: F) -> Result<T, E>
where
F: std::future::Future<Output = Result<T, E>>,
E: std::fmt::Debug + From<anyhow::Error>,
{
{
let mut inner = self.inner.lock().await;
match inner.state {
CircuitState::Open => {
if let Some(last_failure_time) = inner.last_failure_time {
let elapsed = chrono::Utc::now()
.signed_duration_since(last_failure_time)
.to_std()
.unwrap_or(Duration::ZERO);
if elapsed < self.timeout {
return Err(E::from(anyhow::anyhow!("Circuit breaker is open")));
}
}
inner.state = CircuitState::HalfOpen;
}
CircuitState::HalfOpen | CircuitState::Closed => {
}
}
}
match operation.await {
Ok(result) => {
let mut inner = self.inner.lock().await;
if inner.state == CircuitState::HalfOpen {
inner.state = CircuitState::Closed;
inner.failure_count = 0;
}
Ok(result)
}
Err(error) => {
let mut inner = self.inner.lock().await;
inner.failure_count += 1;
if inner.failure_count >= self.failure_threshold {
inner.state = CircuitState::Open;
inner.last_failure_time = Some(chrono::Utc::now());
}
Err(error)
}
}
}
pub async fn get_state(&self) -> CircuitState {
self.inner.lock().await.state.clone()
}
pub async fn reset(&self) {
let mut inner = self.inner.lock().await;
inner.state = CircuitState::Closed;
inner.failure_count = 0;
inner.last_failure_time = None;
}
}