floxide_redis/
run_info_store.rs

1//! Redis implementation of the RunInfoStore trait.
2
3use crate::client::RedisClient;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use floxide_core::distributed::{RunInfo, RunInfoError, RunInfoStore, RunStatus};
7use redis::AsyncCommands;
8use tracing::{error, instrument, trace};
9
10/// Redis implementation of the RunInfoStore trait.
11#[derive(Clone)]
12pub struct RedisRunInfoStore {
13    client: RedisClient,
14}
15
16impl RedisRunInfoStore {
17    /// Create a new Redis run info store with the given client.
18    pub fn new(client: RedisClient) -> Self {
19        Self { client }
20    }
21
22    /// Get the Redis key for a specific run info.
23    fn run_info_key(&self, run_id: &str) -> String {
24        self.client.prefixed_key(&format!("run_info:{}", run_id))
25    }
26
27    /// Get the Redis key for the set of all run IDs.
28    fn all_runs_key(&self) -> String {
29        self.client.prefixed_key("all_runs")
30    }
31
32    /// Get the Redis key for the set of run IDs with a specific status.
33    fn status_runs_key(&self, status: &RunStatus) -> String {
34        self.client.prefixed_key(&format!("status:{:?}", status))
35    }
36}
37
38#[async_trait]
39impl RunInfoStore for RedisRunInfoStore {
40    #[instrument(skip(self, info), level = "trace")]
41    async fn insert_run(&self, info: RunInfo) -> Result<(), RunInfoError> {
42        let run_key = self.run_info_key(&info.run_id);
43        let all_runs_key = self.all_runs_key();
44        let status_key = self.status_runs_key(&info.status);
45
46        // Convert to serializable run info
47
48        // Serialize the run info
49        let serialized = serde_json::to_string(&info).map_err(|e| {
50            error!("Failed to serialize run info: {}", e);
51            RunInfoError::Other(format!("Serialization error: {}", e))
52        })?;
53
54        // Use a Redis pipeline to atomically:
55        // 1. Store the serialized run info
56        // 2. Add the run ID to the set of all runs
57        // 3. Add the run ID to the set of runs with this status
58        let mut conn = self.client.conn.clone();
59        let _result: () = redis::pipe()
60            .set(&run_key, serialized)
61            .sadd(&all_runs_key, &info.run_id)
62            .sadd(&status_key, &info.run_id)
63            .query_async(&mut conn)
64            .await
65            .map_err(|e| {
66                error!("Redis error while inserting run info: {}", e);
67                RunInfoError::Io(e.to_string())
68            })?;
69
70        trace!("Inserted run info for run {}", info.run_id);
71        Ok(())
72    }
73
74    #[instrument(skip(self), level = "trace")]
75    async fn update_status(&self, run_id: &str, status: RunStatus) -> Result<(), RunInfoError> {
76        let run_key = self.run_info_key(run_id);
77        let mut conn = self.client.conn.clone();
78
79        // Get the current run info
80        let result: Option<String> = conn.get(&run_key).await.map_err(|e| {
81            error!("Redis error while getting run info: {}", e);
82            RunInfoError::Io(e.to_string())
83        })?;
84
85        let mut info = match result {
86            Some(serialized) => serde_json::from_str::<RunInfo>(&serialized).map_err(|e| {
87                error!("Failed to deserialize run info: {}", e);
88                RunInfoError::Other(format!("Deserialization error: {}", e))
89            })?,
90            None => return Err(RunInfoError::NotFound),
91        };
92
93        // Get the old status key
94        let old_status_key = self.status_runs_key(&info.status);
95
96        // Update the status
97        info.status = status.clone();
98
99        // Get the new status key
100        let new_status_key = self.status_runs_key(&info.status);
101
102        // Serialize the updated run info
103        let serialized = serde_json::to_string(&info).map_err(|e| {
104            error!("Failed to serialize run info: {}", e);
105            RunInfoError::Other(format!("Serialization error: {}", e))
106        })?;
107
108        // Use a Redis pipeline to atomically:
109        // 1. Store the updated run info
110        // 2. Remove the run ID from the old status set
111        // 3. Add the run ID to the new status set
112        let _result: () = redis::pipe()
113            .set(&run_key, serialized)
114            .srem(&old_status_key, run_id)
115            .sadd(&new_status_key, run_id)
116            .query_async(&mut conn)
117            .await
118            .map_err(|e| {
119                error!("Redis error while updating run status: {}", e);
120                RunInfoError::Io(e.to_string())
121            })?;
122
123        trace!("Updated status for run {} to {:?}", run_id, status);
124        Ok(())
125    }
126
127    #[instrument(skip(self), level = "trace")]
128    async fn update_finished_at(
129        &self,
130        run_id: &str,
131        finished_at: DateTime<Utc>,
132    ) -> Result<(), RunInfoError> {
133        let run_key = self.run_info_key(run_id);
134        let mut conn = self.client.conn.clone();
135
136        // Get the current run info
137        let result: Option<String> = conn.get(&run_key).await.map_err(|e| {
138            error!("Redis error while getting run info: {}", e);
139            RunInfoError::Io(e.to_string())
140        })?;
141
142        let mut info = match result {
143            Some(serialized) => serde_json::from_str::<RunInfo>(&serialized).map_err(|e| {
144                error!("Failed to deserialize run info: {}", e);
145                RunInfoError::Other(format!("Deserialization error: {}", e))
146            })?,
147            None => return Err(RunInfoError::NotFound),
148        };
149
150        // Update the finished_at timestamp
151        info.finished_at = Some(finished_at);
152
153        // Serialize the updated run info
154        let serialized = serde_json::to_string(&info).map_err(|e| {
155            error!("Failed to serialize run info: {}", e);
156            RunInfoError::Other(format!("Serialization error: {}", e))
157        })?;
158
159        // Store the updated run info
160        let _result: () = conn.set(&run_key, serialized).await.map_err(|e| {
161            error!("Redis error while updating finished_at: {}", e);
162            RunInfoError::Io(e.to_string())
163        })?;
164
165        trace!("Updated finished_at for run {} to {}", run_id, finished_at);
166        Ok(())
167    }
168
169    #[instrument(skip(self), level = "trace")]
170    async fn get_run(&self, run_id: &str) -> Result<Option<RunInfo>, RunInfoError> {
171        let run_key = self.run_info_key(run_id);
172        let mut conn = self.client.conn.clone();
173
174        // Get the run info
175        let result: Option<String> = conn.get(&run_key).await.map_err(|e| {
176            error!("Redis error while getting run info: {}", e);
177            RunInfoError::Io(e.to_string())
178        })?;
179
180        // If the run info exists, deserialize it
181        if let Some(serialized) = result {
182            let info = serde_json::from_str::<RunInfo>(&serialized).map_err(|e| {
183                error!("Failed to deserialize run info: {}", e);
184                RunInfoError::Other(format!("Deserialization error: {}", e))
185            })?;
186
187            trace!("Got run info for run {}", run_id);
188            Ok(Some(info))
189        } else {
190            trace!("No run info found for run {}", run_id);
191            Ok(None)
192        }
193    }
194
195    #[instrument(skip(self), level = "trace")]
196    async fn list_runs(&self, filter: Option<RunStatus>) -> Result<Vec<RunInfo>, RunInfoError> {
197        let mut conn = self.client.conn.clone();
198
199        // Get the set of run IDs to list
200        let run_ids: Vec<String> = match filter {
201            Some(status) => {
202                let status_key = self.status_runs_key(&status);
203                conn.smembers(&status_key).await.map_err(|e| {
204                    error!("Redis error while getting run IDs by status: {}", e);
205                    RunInfoError::Io(e.to_string())
206                })?
207            }
208            None => {
209                let all_runs_key = self.all_runs_key();
210                conn.smembers(&all_runs_key).await.map_err(|e| {
211                    error!("Redis error while getting all run IDs: {}", e);
212                    RunInfoError::Io(e.to_string())
213                })?
214            }
215        };
216
217        // Get the run info for each run ID
218        let mut runs = Vec::with_capacity(run_ids.len());
219        for run_id in run_ids {
220            let run_key = self.run_info_key(&run_id);
221            let result: Option<String> = conn.get(&run_key).await.map_err(|e| {
222                error!("Redis error while getting run info: {}", e);
223                RunInfoError::Io(e.to_string())
224            })?;
225
226            if let Some(serialized) = result {
227                let info = serde_json::from_str::<RunInfo>(&serialized).map_err(|e| {
228                    error!("Failed to deserialize run info: {}", e);
229                    RunInfoError::Other(format!("Deserialization error: {}", e))
230                })?;
231                runs.push(info);
232            }
233        }
234
235        trace!("Listed {} runs", runs.len());
236        Ok(runs)
237    }
238
239    async fn update_output(
240        &self,
241        run_id: &str,
242        output: serde_json::Value,
243    ) -> Result<(), RunInfoError> {
244        let run_key = self.run_info_key(run_id);
245        let mut conn = self.client.conn.clone();
246
247        // Get the current run info
248        let result: Option<String> = conn.get(&run_key).await.map_err(|e| {
249            error!("Redis error while getting run info: {}", e);
250            RunInfoError::Io(e.to_string())
251        })?;
252
253        let mut info = match result {
254            Some(serialized) => serde_json::from_str::<RunInfo>(&serialized).map_err(|e| {
255                error!("Failed to deserialize run info: {}", e);
256                RunInfoError::Other(format!("Deserialization error: {}", e))
257            })?,
258            None => return Err(RunInfoError::NotFound),
259        };
260
261        // Update the output field
262        info.output = Some(output);
263
264        // Serialize the updated run info
265        let serialized = serde_json::to_string(&info).map_err(|e| {
266            error!("Failed to serialize run info: {}", e);
267            RunInfoError::Other(format!("Serialization error: {}", e))
268        })?;
269
270        // Store the updated run info
271        let _result: () = conn.set(&run_key, serialized).await.map_err(|e| {
272            error!("Redis error while updating output: {}", e);
273            RunInfoError::Io(e.to_string())
274        })?;
275
276        trace!("Updated output for run {}", run_id);
277        Ok(())
278    }
279}