1use crate::client::RedisClient;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use floxide_core::distributed::{RunInfo, RunInfoError, RunInfoStore, RunStatus};
7use redis::AsyncCommands;
8use tracing::{error, instrument, trace};
9
10#[derive(Clone)]
12pub struct RedisRunInfoStore {
13 client: RedisClient,
14}
15
16impl RedisRunInfoStore {
17 pub fn new(client: RedisClient) -> Self {
19 Self { client }
20 }
21
22 fn run_info_key(&self, run_id: &str) -> String {
24 self.client.prefixed_key(&format!("run_info:{}", run_id))
25 }
26
27 fn all_runs_key(&self) -> String {
29 self.client.prefixed_key("all_runs")
30 }
31
32 fn status_runs_key(&self, status: &RunStatus) -> String {
34 self.client.prefixed_key(&format!("status:{:?}", status))
35 }
36}
37
38#[async_trait]
39impl RunInfoStore for RedisRunInfoStore {
40 #[instrument(skip(self, info), level = "trace")]
41 async fn insert_run(&self, info: RunInfo) -> Result<(), RunInfoError> {
42 let run_key = self.run_info_key(&info.run_id);
43 let all_runs_key = self.all_runs_key();
44 let status_key = self.status_runs_key(&info.status);
45
46 let serialized = serde_json::to_string(&info).map_err(|e| {
50 error!("Failed to serialize run info: {}", e);
51 RunInfoError::Other(format!("Serialization error: {}", e))
52 })?;
53
54 let mut conn = self.client.conn.clone();
59 let _result: () = redis::pipe()
60 .set(&run_key, serialized)
61 .sadd(&all_runs_key, &info.run_id)
62 .sadd(&status_key, &info.run_id)
63 .query_async(&mut conn)
64 .await
65 .map_err(|e| {
66 error!("Redis error while inserting run info: {}", e);
67 RunInfoError::Io(e.to_string())
68 })?;
69
70 trace!("Inserted run info for run {}", info.run_id);
71 Ok(())
72 }
73
74 #[instrument(skip(self), level = "trace")]
75 async fn update_status(&self, run_id: &str, status: RunStatus) -> Result<(), RunInfoError> {
76 let run_key = self.run_info_key(run_id);
77 let mut conn = self.client.conn.clone();
78
79 let result: Option<String> = conn.get(&run_key).await.map_err(|e| {
81 error!("Redis error while getting run info: {}", e);
82 RunInfoError::Io(e.to_string())
83 })?;
84
85 let mut info = match result {
86 Some(serialized) => serde_json::from_str::<RunInfo>(&serialized).map_err(|e| {
87 error!("Failed to deserialize run info: {}", e);
88 RunInfoError::Other(format!("Deserialization error: {}", e))
89 })?,
90 None => return Err(RunInfoError::NotFound),
91 };
92
93 let old_status_key = self.status_runs_key(&info.status);
95
96 info.status = status.clone();
98
99 let new_status_key = self.status_runs_key(&info.status);
101
102 let serialized = serde_json::to_string(&info).map_err(|e| {
104 error!("Failed to serialize run info: {}", e);
105 RunInfoError::Other(format!("Serialization error: {}", e))
106 })?;
107
108 let _result: () = redis::pipe()
113 .set(&run_key, serialized)
114 .srem(&old_status_key, run_id)
115 .sadd(&new_status_key, run_id)
116 .query_async(&mut conn)
117 .await
118 .map_err(|e| {
119 error!("Redis error while updating run status: {}", e);
120 RunInfoError::Io(e.to_string())
121 })?;
122
123 trace!("Updated status for run {} to {:?}", run_id, status);
124 Ok(())
125 }
126
127 #[instrument(skip(self), level = "trace")]
128 async fn update_finished_at(
129 &self,
130 run_id: &str,
131 finished_at: DateTime<Utc>,
132 ) -> Result<(), RunInfoError> {
133 let run_key = self.run_info_key(run_id);
134 let mut conn = self.client.conn.clone();
135
136 let result: Option<String> = conn.get(&run_key).await.map_err(|e| {
138 error!("Redis error while getting run info: {}", e);
139 RunInfoError::Io(e.to_string())
140 })?;
141
142 let mut info = match result {
143 Some(serialized) => serde_json::from_str::<RunInfo>(&serialized).map_err(|e| {
144 error!("Failed to deserialize run info: {}", e);
145 RunInfoError::Other(format!("Deserialization error: {}", e))
146 })?,
147 None => return Err(RunInfoError::NotFound),
148 };
149
150 info.finished_at = Some(finished_at);
152
153 let serialized = serde_json::to_string(&info).map_err(|e| {
155 error!("Failed to serialize run info: {}", e);
156 RunInfoError::Other(format!("Serialization error: {}", e))
157 })?;
158
159 let _result: () = conn.set(&run_key, serialized).await.map_err(|e| {
161 error!("Redis error while updating finished_at: {}", e);
162 RunInfoError::Io(e.to_string())
163 })?;
164
165 trace!("Updated finished_at for run {} to {}", run_id, finished_at);
166 Ok(())
167 }
168
169 #[instrument(skip(self), level = "trace")]
170 async fn get_run(&self, run_id: &str) -> Result<Option<RunInfo>, RunInfoError> {
171 let run_key = self.run_info_key(run_id);
172 let mut conn = self.client.conn.clone();
173
174 let result: Option<String> = conn.get(&run_key).await.map_err(|e| {
176 error!("Redis error while getting run info: {}", e);
177 RunInfoError::Io(e.to_string())
178 })?;
179
180 if let Some(serialized) = result {
182 let info = serde_json::from_str::<RunInfo>(&serialized).map_err(|e| {
183 error!("Failed to deserialize run info: {}", e);
184 RunInfoError::Other(format!("Deserialization error: {}", e))
185 })?;
186
187 trace!("Got run info for run {}", run_id);
188 Ok(Some(info))
189 } else {
190 trace!("No run info found for run {}", run_id);
191 Ok(None)
192 }
193 }
194
195 #[instrument(skip(self), level = "trace")]
196 async fn list_runs(&self, filter: Option<RunStatus>) -> Result<Vec<RunInfo>, RunInfoError> {
197 let mut conn = self.client.conn.clone();
198
199 let run_ids: Vec<String> = match filter {
201 Some(status) => {
202 let status_key = self.status_runs_key(&status);
203 conn.smembers(&status_key).await.map_err(|e| {
204 error!("Redis error while getting run IDs by status: {}", e);
205 RunInfoError::Io(e.to_string())
206 })?
207 }
208 None => {
209 let all_runs_key = self.all_runs_key();
210 conn.smembers(&all_runs_key).await.map_err(|e| {
211 error!("Redis error while getting all run IDs: {}", e);
212 RunInfoError::Io(e.to_string())
213 })?
214 }
215 };
216
217 let mut runs = Vec::with_capacity(run_ids.len());
219 for run_id in run_ids {
220 let run_key = self.run_info_key(&run_id);
221 let result: Option<String> = conn.get(&run_key).await.map_err(|e| {
222 error!("Redis error while getting run info: {}", e);
223 RunInfoError::Io(e.to_string())
224 })?;
225
226 if let Some(serialized) = result {
227 let info = serde_json::from_str::<RunInfo>(&serialized).map_err(|e| {
228 error!("Failed to deserialize run info: {}", e);
229 RunInfoError::Other(format!("Deserialization error: {}", e))
230 })?;
231 runs.push(info);
232 }
233 }
234
235 trace!("Listed {} runs", runs.len());
236 Ok(runs)
237 }
238
239 async fn update_output(
240 &self,
241 run_id: &str,
242 output: serde_json::Value,
243 ) -> Result<(), RunInfoError> {
244 let run_key = self.run_info_key(run_id);
245 let mut conn = self.client.conn.clone();
246
247 let result: Option<String> = conn.get(&run_key).await.map_err(|e| {
249 error!("Redis error while getting run info: {}", e);
250 RunInfoError::Io(e.to_string())
251 })?;
252
253 let mut info = match result {
254 Some(serialized) => serde_json::from_str::<RunInfo>(&serialized).map_err(|e| {
255 error!("Failed to deserialize run info: {}", e);
256 RunInfoError::Other(format!("Deserialization error: {}", e))
257 })?,
258 None => return Err(RunInfoError::NotFound),
259 };
260
261 info.output = Some(output);
263
264 let serialized = serde_json::to_string(&info).map_err(|e| {
266 error!("Failed to serialize run info: {}", e);
267 RunInfoError::Other(format!("Serialization error: {}", e))
268 })?;
269
270 let _result: () = conn.set(&run_key, serialized).await.map_err(|e| {
272 error!("Redis error while updating output: {}", e);
273 RunInfoError::Io(e.to_string())
274 })?;
275
276 trace!("Updated output for run {}", run_id);
277 Ok(())
278 }
279}