floxide_redis/
liveness_store.rs

1//! Redis implementation of the LivenessStore trait.
2
3use crate::client::RedisClient;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use floxide_core::distributed::{LivenessStore, LivenessStoreError, WorkerHealth};
7use redis::AsyncCommands;
8use tracing::{error, instrument, trace};
9
10/// Redis implementation of the LivenessStore trait.
11#[derive(Clone)]
12pub struct RedisLivenessStore {
13    client: RedisClient,
14}
15
16impl RedisLivenessStore {
17    /// Create a new Redis liveness store with the given client.
18    pub fn new(client: RedisClient) -> Self {
19        Self { client }
20    }
21
22    /// Get the Redis key for heartbeats.
23    fn heartbeats_key(&self) -> String {
24        self.client.prefixed_key("heartbeats")
25    }
26
27    /// Get the Redis key for worker health.
28    fn health_key(&self) -> String {
29        self.client.prefixed_key("worker_health")
30    }
31
32    /// Get the Redis key for the set of all worker IDs.
33    fn workers_key(&self) -> String {
34        self.client.prefixed_key("workers")
35    }
36}
37
38#[async_trait]
39impl LivenessStore for RedisLivenessStore {
40    #[instrument(skip(self), level = "trace")]
41    async fn update_heartbeat(
42        &self,
43        worker_id: usize,
44        timestamp: DateTime<Utc>,
45    ) -> Result<(), LivenessStoreError> {
46        let heartbeats_key = self.heartbeats_key();
47        let workers_key = self.workers_key();
48        let mut conn = self.client.conn.clone();
49
50        // Use a Redis pipeline to atomically:
51        // 1. Store the heartbeat timestamp
52        // 2. Add the worker ID to the set of all workers
53        let _result: () = redis::pipe()
54            .hset(
55                &heartbeats_key,
56                worker_id.to_string(),
57                timestamp.to_rfc3339(),
58            )
59            .sadd(&workers_key, worker_id.to_string())
60            .query_async(&mut conn)
61            .await
62            .map_err(|e| {
63                error!("Redis error while updating heartbeat: {}", e);
64                LivenessStoreError::Io(e.to_string())
65            })?;
66
67        trace!(
68            "Updated heartbeat for worker {} to {}",
69            worker_id,
70            timestamp
71        );
72        Ok(())
73    }
74
75    #[instrument(skip(self), level = "trace")]
76    async fn get_heartbeat(
77        &self,
78        worker_id: usize,
79    ) -> Result<Option<DateTime<Utc>>, LivenessStoreError> {
80        let heartbeats_key = self.heartbeats_key();
81        let mut conn = self.client.conn.clone();
82
83        // Get the heartbeat timestamp from Redis
84        let result: Option<String> = conn
85            .hget(&heartbeats_key, worker_id.to_string())
86            .await
87            .map_err(|e| {
88                error!("Redis error while getting heartbeat: {}", e);
89                LivenessStoreError::Io(e.to_string())
90            })?;
91
92        // If the heartbeat exists, parse it
93        if let Some(timestamp_str) = result {
94            let timestamp = DateTime::parse_from_rfc3339(&timestamp_str)
95                .map_err(|e| {
96                    error!("Failed to parse timestamp: {}", e);
97                    LivenessStoreError::Other(format!("Timestamp parsing error: {}", e))
98                })?
99                .with_timezone(&Utc);
100
101            trace!("Got heartbeat for worker {}: {}", worker_id, timestamp);
102            Ok(Some(timestamp))
103        } else {
104            trace!("No heartbeat found for worker {}", worker_id);
105            Ok(None)
106        }
107    }
108
109    #[instrument(skip(self), level = "trace")]
110    async fn list_workers(&self) -> Result<Vec<usize>, LivenessStoreError> {
111        let workers_key = self.workers_key();
112        let mut conn = self.client.conn.clone();
113
114        // Get all worker IDs from Redis
115        let worker_ids: Vec<String> = conn.smembers(&workers_key).await.map_err(|e| {
116            error!("Redis error while listing workers: {}", e);
117            LivenessStoreError::Io(e.to_string())
118        })?;
119
120        // Parse each worker ID
121        let mut workers = Vec::with_capacity(worker_ids.len());
122        for id_str in worker_ids {
123            let id = id_str.parse::<usize>().map_err(|e| {
124                error!("Failed to parse worker ID: {}", e);
125                LivenessStoreError::Other(format!("Worker ID parsing error: {}", e))
126            })?;
127            workers.push(id);
128        }
129
130        trace!("Listed {} workers", workers.len());
131        Ok(workers)
132    }
133
134    #[instrument(skip(self), level = "trace")]
135    async fn update_health(
136        &self,
137        worker_id: usize,
138        health: WorkerHealth,
139    ) -> Result<(), LivenessStoreError> {
140        let health_key = self.health_key();
141        let mut conn = self.client.conn.clone();
142
143        // Serialize the health status
144        let serialized = serde_json::to_string(&health).map_err(|e| {
145            error!("Failed to serialize health status: {}", e);
146            LivenessStoreError::Other(format!("Serialization error: {}", e))
147        })?;
148
149        // Store the health status in Redis
150        let _result: () = conn
151            .hset(&health_key, worker_id.to_string(), serialized)
152            .await
153            .map_err(|e| {
154                error!("Redis error while updating health: {}", e);
155                LivenessStoreError::Io(e.to_string())
156            })?;
157
158        trace!("Updated health for worker {} to {:?}", worker_id, health);
159        Ok(())
160    }
161
162    #[instrument(skip(self), level = "trace")]
163    async fn get_health(
164        &self,
165        worker_id: usize,
166    ) -> Result<Option<WorkerHealth>, LivenessStoreError> {
167        let health_key = self.health_key();
168        let mut conn = self.client.conn.clone();
169
170        // Get the health status from Redis
171        let result: Option<String> = conn
172            .hget(&health_key, worker_id.to_string())
173            .await
174            .map_err(|e| {
175                error!("Redis error while getting health: {}", e);
176                LivenessStoreError::Io(e.to_string())
177            })?;
178
179        // If the health status exists, deserialize it
180        if let Some(serialized) = result {
181            let health = serde_json::from_str(&serialized).map_err(|e| {
182                error!("Failed to deserialize health status: {}", e);
183                LivenessStoreError::Other(format!("Deserialization error: {}", e))
184            })?;
185
186            trace!("Got health for worker {}: {:?}", worker_id, health);
187            Ok(Some(health))
188        } else {
189            trace!("No health status found for worker {}", worker_id);
190            Ok(None)
191        }
192    }
193
194    #[instrument(skip(self), level = "trace")]
195    async fn list_health(&self) -> Result<Vec<WorkerHealth>, LivenessStoreError> {
196        let health_key = self.health_key();
197        let workers_key = self.workers_key();
198        let mut conn = self.client.conn.clone();
199
200        // Get all worker IDs
201        let worker_ids: Vec<String> = conn.smembers(&workers_key).await.map_err(|e| {
202            error!("Redis error while listing workers: {}", e);
203            LivenessStoreError::Io(e.to_string())
204        })?;
205
206        // Get the health status for each worker
207        let mut health_statuses = Vec::with_capacity(worker_ids.len());
208        for id_str in worker_ids {
209            let result: Option<String> = conn.hget(&health_key, &id_str).await.map_err(|e| {
210                error!("Redis error while getting health: {}", e);
211                LivenessStoreError::Io(e.to_string())
212            })?;
213
214            if let Some(serialized) = result {
215                let health = serde_json::from_str(&serialized).map_err(|e| {
216                    error!("Failed to deserialize health status: {}", e);
217                    LivenessStoreError::Other(format!("Deserialization error: {}", e))
218                })?;
219                health_statuses.push(health);
220            }
221        }
222
223        trace!("Listed {} health statuses", health_statuses.len());
224        Ok(health_statuses)
225    }
226}