use crate::client::RedisClient;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use floxide_core::distributed::{LivenessStore, LivenessStoreError, WorkerHealth};
use redis::AsyncCommands;
use tracing::{error, instrument, trace};
#[derive(Clone)]
pub struct RedisLivenessStore {
client: RedisClient,
}
impl RedisLivenessStore {
pub fn new(client: RedisClient) -> Self {
Self { client }
}
fn heartbeats_key(&self) -> String {
self.client.prefixed_key("heartbeats")
}
fn health_key(&self) -> String {
self.client.prefixed_key("worker_health")
}
fn workers_key(&self) -> String {
self.client.prefixed_key("workers")
}
}
#[async_trait]
impl LivenessStore for RedisLivenessStore {
#[instrument(skip(self), level = "trace")]
async fn update_heartbeat(
&self,
worker_id: usize,
timestamp: DateTime<Utc>,
) -> Result<(), LivenessStoreError> {
let heartbeats_key = self.heartbeats_key();
let workers_key = self.workers_key();
let mut conn = self.client.conn.clone();
let _result: () = redis::pipe()
.hset(
&heartbeats_key,
worker_id.to_string(),
timestamp.to_rfc3339(),
)
.sadd(&workers_key, worker_id.to_string())
.query_async(&mut conn)
.await
.map_err(|e| {
error!("Redis error while updating heartbeat: {}", e);
LivenessStoreError::Io(e.to_string())
})?;
trace!(
"Updated heartbeat for worker {} to {}",
worker_id,
timestamp
);
Ok(())
}
#[instrument(skip(self), level = "trace")]
async fn get_heartbeat(
&self,
worker_id: usize,
) -> Result<Option<DateTime<Utc>>, LivenessStoreError> {
let heartbeats_key = self.heartbeats_key();
let mut conn = self.client.conn.clone();
let result: Option<String> = conn
.hget(&heartbeats_key, worker_id.to_string())
.await
.map_err(|e| {
error!("Redis error while getting heartbeat: {}", e);
LivenessStoreError::Io(e.to_string())
})?;
if let Some(timestamp_str) = result {
let timestamp = DateTime::parse_from_rfc3339(×tamp_str)
.map_err(|e| {
error!("Failed to parse timestamp: {}", e);
LivenessStoreError::Other(format!("Timestamp parsing error: {}", e))
})?
.with_timezone(&Utc);
trace!("Got heartbeat for worker {}: {}", worker_id, timestamp);
Ok(Some(timestamp))
} else {
trace!("No heartbeat found for worker {}", worker_id);
Ok(None)
}
}
#[instrument(skip(self), level = "trace")]
async fn list_workers(&self) -> Result<Vec<usize>, LivenessStoreError> {
let workers_key = self.workers_key();
let mut conn = self.client.conn.clone();
let worker_ids: Vec<String> = conn.smembers(&workers_key).await.map_err(|e| {
error!("Redis error while listing workers: {}", e);
LivenessStoreError::Io(e.to_string())
})?;
let mut workers = Vec::with_capacity(worker_ids.len());
for id_str in worker_ids {
let id = id_str.parse::<usize>().map_err(|e| {
error!("Failed to parse worker ID: {}", e);
LivenessStoreError::Other(format!("Worker ID parsing error: {}", e))
})?;
workers.push(id);
}
trace!("Listed {} workers", workers.len());
Ok(workers)
}
#[instrument(skip(self), level = "trace")]
async fn update_health(
&self,
worker_id: usize,
health: WorkerHealth,
) -> Result<(), LivenessStoreError> {
let health_key = self.health_key();
let mut conn = self.client.conn.clone();
let serialized = serde_json::to_string(&health).map_err(|e| {
error!("Failed to serialize health status: {}", e);
LivenessStoreError::Other(format!("Serialization error: {}", e))
})?;
let _result: () = conn
.hset(&health_key, worker_id.to_string(), serialized)
.await
.map_err(|e| {
error!("Redis error while updating health: {}", e);
LivenessStoreError::Io(e.to_string())
})?;
trace!("Updated health for worker {} to {:?}", worker_id, health);
Ok(())
}
#[instrument(skip(self), level = "trace")]
async fn get_health(
&self,
worker_id: usize,
) -> Result<Option<WorkerHealth>, LivenessStoreError> {
let health_key = self.health_key();
let mut conn = self.client.conn.clone();
let result: Option<String> = conn
.hget(&health_key, worker_id.to_string())
.await
.map_err(|e| {
error!("Redis error while getting health: {}", e);
LivenessStoreError::Io(e.to_string())
})?;
if let Some(serialized) = result {
let health = serde_json::from_str(&serialized).map_err(|e| {
error!("Failed to deserialize health status: {}", e);
LivenessStoreError::Other(format!("Deserialization error: {}", e))
})?;
trace!("Got health for worker {}: {:?}", worker_id, health);
Ok(Some(health))
} else {
trace!("No health status found for worker {}", worker_id);
Ok(None)
}
}
#[instrument(skip(self), level = "trace")]
async fn list_health(&self) -> Result<Vec<WorkerHealth>, LivenessStoreError> {
let health_key = self.health_key();
let workers_key = self.workers_key();
let mut conn = self.client.conn.clone();
let worker_ids: Vec<String> = conn.smembers(&workers_key).await.map_err(|e| {
error!("Redis error while listing workers: {}", e);
LivenessStoreError::Io(e.to_string())
})?;
let mut health_statuses = Vec::with_capacity(worker_ids.len());
for id_str in worker_ids {
let result: Option<String> = conn.hget(&health_key, &id_str).await.map_err(|e| {
error!("Redis error while getting health: {}", e);
LivenessStoreError::Io(e.to_string())
})?;
if let Some(serialized) = result {
let health = serde_json::from_str(&serialized).map_err(|e| {
error!("Failed to deserialize health status: {}", e);
LivenessStoreError::Other(format!("Deserialization error: {}", e))
})?;
health_statuses.push(health);
}
}
trace!("Listed {} health statuses", health_statuses.len());
Ok(health_statuses)
}
}