floxide_redis/
liveness_store.rs1use crate::client::RedisClient;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use floxide_core::distributed::{LivenessStore, LivenessStoreError, WorkerHealth};
7use redis::AsyncCommands;
8use tracing::{error, instrument, trace};
9
10#[derive(Clone)]
12pub struct RedisLivenessStore {
13 client: RedisClient,
14}
15
16impl RedisLivenessStore {
17 pub fn new(client: RedisClient) -> Self {
19 Self { client }
20 }
21
22 fn heartbeats_key(&self) -> String {
24 self.client.prefixed_key("heartbeats")
25 }
26
27 fn health_key(&self) -> String {
29 self.client.prefixed_key("worker_health")
30 }
31
32 fn workers_key(&self) -> String {
34 self.client.prefixed_key("workers")
35 }
36}
37
38#[async_trait]
39impl LivenessStore for RedisLivenessStore {
40 #[instrument(skip(self), level = "trace")]
41 async fn update_heartbeat(
42 &self,
43 worker_id: usize,
44 timestamp: DateTime<Utc>,
45 ) -> Result<(), LivenessStoreError> {
46 let heartbeats_key = self.heartbeats_key();
47 let workers_key = self.workers_key();
48 let mut conn = self.client.conn.clone();
49
50 let _result: () = redis::pipe()
54 .hset(
55 &heartbeats_key,
56 worker_id.to_string(),
57 timestamp.to_rfc3339(),
58 )
59 .sadd(&workers_key, worker_id.to_string())
60 .query_async(&mut conn)
61 .await
62 .map_err(|e| {
63 error!("Redis error while updating heartbeat: {}", e);
64 LivenessStoreError::Io(e.to_string())
65 })?;
66
67 trace!(
68 "Updated heartbeat for worker {} to {}",
69 worker_id,
70 timestamp
71 );
72 Ok(())
73 }
74
75 #[instrument(skip(self), level = "trace")]
76 async fn get_heartbeat(
77 &self,
78 worker_id: usize,
79 ) -> Result<Option<DateTime<Utc>>, LivenessStoreError> {
80 let heartbeats_key = self.heartbeats_key();
81 let mut conn = self.client.conn.clone();
82
83 let result: Option<String> = conn
85 .hget(&heartbeats_key, worker_id.to_string())
86 .await
87 .map_err(|e| {
88 error!("Redis error while getting heartbeat: {}", e);
89 LivenessStoreError::Io(e.to_string())
90 })?;
91
92 if let Some(timestamp_str) = result {
94 let timestamp = DateTime::parse_from_rfc3339(×tamp_str)
95 .map_err(|e| {
96 error!("Failed to parse timestamp: {}", e);
97 LivenessStoreError::Other(format!("Timestamp parsing error: {}", e))
98 })?
99 .with_timezone(&Utc);
100
101 trace!("Got heartbeat for worker {}: {}", worker_id, timestamp);
102 Ok(Some(timestamp))
103 } else {
104 trace!("No heartbeat found for worker {}", worker_id);
105 Ok(None)
106 }
107 }
108
109 #[instrument(skip(self), level = "trace")]
110 async fn list_workers(&self) -> Result<Vec<usize>, LivenessStoreError> {
111 let workers_key = self.workers_key();
112 let mut conn = self.client.conn.clone();
113
114 let worker_ids: Vec<String> = conn.smembers(&workers_key).await.map_err(|e| {
116 error!("Redis error while listing workers: {}", e);
117 LivenessStoreError::Io(e.to_string())
118 })?;
119
120 let mut workers = Vec::with_capacity(worker_ids.len());
122 for id_str in worker_ids {
123 let id = id_str.parse::<usize>().map_err(|e| {
124 error!("Failed to parse worker ID: {}", e);
125 LivenessStoreError::Other(format!("Worker ID parsing error: {}", e))
126 })?;
127 workers.push(id);
128 }
129
130 trace!("Listed {} workers", workers.len());
131 Ok(workers)
132 }
133
134 #[instrument(skip(self), level = "trace")]
135 async fn update_health(
136 &self,
137 worker_id: usize,
138 health: WorkerHealth,
139 ) -> Result<(), LivenessStoreError> {
140 let health_key = self.health_key();
141 let mut conn = self.client.conn.clone();
142
143 let serialized = serde_json::to_string(&health).map_err(|e| {
145 error!("Failed to serialize health status: {}", e);
146 LivenessStoreError::Other(format!("Serialization error: {}", e))
147 })?;
148
149 let _result: () = conn
151 .hset(&health_key, worker_id.to_string(), serialized)
152 .await
153 .map_err(|e| {
154 error!("Redis error while updating health: {}", e);
155 LivenessStoreError::Io(e.to_string())
156 })?;
157
158 trace!("Updated health for worker {} to {:?}", worker_id, health);
159 Ok(())
160 }
161
162 #[instrument(skip(self), level = "trace")]
163 async fn get_health(
164 &self,
165 worker_id: usize,
166 ) -> Result<Option<WorkerHealth>, LivenessStoreError> {
167 let health_key = self.health_key();
168 let mut conn = self.client.conn.clone();
169
170 let result: Option<String> = conn
172 .hget(&health_key, worker_id.to_string())
173 .await
174 .map_err(|e| {
175 error!("Redis error while getting health: {}", e);
176 LivenessStoreError::Io(e.to_string())
177 })?;
178
179 if let Some(serialized) = result {
181 let health = serde_json::from_str(&serialized).map_err(|e| {
182 error!("Failed to deserialize health status: {}", e);
183 LivenessStoreError::Other(format!("Deserialization error: {}", e))
184 })?;
185
186 trace!("Got health for worker {}: {:?}", worker_id, health);
187 Ok(Some(health))
188 } else {
189 trace!("No health status found for worker {}", worker_id);
190 Ok(None)
191 }
192 }
193
194 #[instrument(skip(self), level = "trace")]
195 async fn list_health(&self) -> Result<Vec<WorkerHealth>, LivenessStoreError> {
196 let health_key = self.health_key();
197 let workers_key = self.workers_key();
198 let mut conn = self.client.conn.clone();
199
200 let worker_ids: Vec<String> = conn.smembers(&workers_key).await.map_err(|e| {
202 error!("Redis error while listing workers: {}", e);
203 LivenessStoreError::Io(e.to_string())
204 })?;
205
206 let mut health_statuses = Vec::with_capacity(worker_ids.len());
208 for id_str in worker_ids {
209 let result: Option<String> = conn.hget(&health_key, &id_str).await.map_err(|e| {
210 error!("Redis error while getting health: {}", e);
211 LivenessStoreError::Io(e.to_string())
212 })?;
213
214 if let Some(serialized) = result {
215 let health = serde_json::from_str(&serialized).map_err(|e| {
216 error!("Failed to deserialize health status: {}", e);
217 LivenessStoreError::Other(format!("Deserialization error: {}", e))
218 })?;
219 health_statuses.push(health);
220 }
221 }
222
223 trace!("Listed {} health statuses", health_statuses.len());
224 Ok(health_statuses)
225 }
226}