floxide_redis/
error_store.rs1use crate::client::RedisClient;
4use async_trait::async_trait;
5use floxide_core::distributed::{ErrorStore, ErrorStoreError, WorkflowError};
6use redis::AsyncCommands;
7use tracing::{error, instrument, trace};
8
9#[derive(Clone)]
11pub struct RedisErrorStore {
12 client: RedisClient,
13}
14
15impl RedisErrorStore {
16 pub fn new(client: RedisClient) -> Self {
18 Self { client }
19 }
20
21 fn errors_key(&self, run_id: &str) -> String {
23 self.client.prefixed_key(&format!("errors:{}", run_id))
24 }
25}
26
27#[async_trait]
28impl ErrorStore for RedisErrorStore {
29 #[instrument(skip(self, error_info), level = "trace")]
30 async fn record_error(
31 &self,
32 run_id: &str,
33 error_info: WorkflowError,
34 ) -> Result<(), ErrorStoreError> {
35 let key = self.errors_key(run_id);
36
37 let serialized = serde_json::to_string(&error_info).map_err(|e| {
39 error!("Failed to serialize error: {}", e);
40 ErrorStoreError::Other(format!("Serialization error: {}", e))
41 })?;
42
43 let mut conn = self.client.conn.clone();
45 let _result: () = conn.rpush(&key, serialized).await.map_err(|e| {
46 error!("Redis error while recording error: {}", e);
47 ErrorStoreError::Other(format!("Redis error: {}", e))
48 })?;
49
50 trace!("Recorded error for run {}", run_id);
51 Ok(())
52 }
53
54 async fn get_errors(&self, run_id: &str) -> Result<Vec<WorkflowError>, ErrorStoreError> {
55 let key = self.errors_key(run_id);
56 let mut conn = self.client.conn.clone();
57
58 let results: Vec<String> = conn.lrange(&key, 0, -1).await.map_err(|e| {
60 error!("Redis error while getting errors: {}", e);
61 ErrorStoreError::Other(format!("Redis error: {}", e))
62 })?;
63
64 let mut errors = Vec::with_capacity(results.len());
66 for serialized in results {
67 let error_info = serde_json::from_str(&serialized).map_err(|e| {
68 error!("Failed to deserialize error: {}", e);
69 ErrorStoreError::Other(format!("Deserialization error: {}", e))
70 })?;
71 errors.push(error_info);
72 }
73
74 trace!("Got {} errors for run {}", errors.len(), run_id);
75 Ok(errors)
76 }
77}