floxide_redis/
error_store.rs

1//! Redis implementation of the ErrorStore trait.
2
3use crate::client::RedisClient;
4use async_trait::async_trait;
5use floxide_core::distributed::{ErrorStore, ErrorStoreError, WorkflowError};
6use redis::AsyncCommands;
7use tracing::{error, instrument, trace};
8
9/// Redis implementation of the ErrorStore trait.
10#[derive(Clone)]
11pub struct RedisErrorStore {
12    client: RedisClient,
13}
14
15impl RedisErrorStore {
16    /// Create a new Redis error store with the given client.
17    pub fn new(client: RedisClient) -> Self {
18        Self { client }
19    }
20
21    /// Get the Redis key for errors for a specific run.
22    fn errors_key(&self, run_id: &str) -> String {
23        self.client.prefixed_key(&format!("errors:{}", run_id))
24    }
25}
26
27#[async_trait]
28impl ErrorStore for RedisErrorStore {
29    #[instrument(skip(self, error_info), level = "trace")]
30    async fn record_error(
31        &self,
32        run_id: &str,
33        error_info: WorkflowError,
34    ) -> Result<(), ErrorStoreError> {
35        let key = self.errors_key(run_id);
36
37        // Serialize the error
38        let serialized = serde_json::to_string(&error_info).map_err(|e| {
39            error!("Failed to serialize error: {}", e);
40            ErrorStoreError::Other(format!("Serialization error: {}", e))
41        })?;
42
43        // Add the serialized error to the list in Redis
44        let mut conn = self.client.conn.clone();
45        let _result: () = conn.rpush(&key, serialized).await.map_err(|e| {
46            error!("Redis error while recording error: {}", e);
47            ErrorStoreError::Other(format!("Redis error: {}", e))
48        })?;
49
50        trace!("Recorded error for run {}", run_id);
51        Ok(())
52    }
53
54    async fn get_errors(&self, run_id: &str) -> Result<Vec<WorkflowError>, ErrorStoreError> {
55        let key = self.errors_key(run_id);
56        let mut conn = self.client.conn.clone();
57
58        // Get all errors from the list in Redis
59        let results: Vec<String> = conn.lrange(&key, 0, -1).await.map_err(|e| {
60            error!("Redis error while getting errors: {}", e);
61            ErrorStoreError::Other(format!("Redis error: {}", e))
62        })?;
63
64        // Deserialize each error
65        let mut errors = Vec::with_capacity(results.len());
66        for serialized in results {
67            let error_info = serde_json::from_str(&serialized).map_err(|e| {
68                error!("Failed to deserialize error: {}", e);
69                ErrorStoreError::Other(format!("Deserialization error: {}", e))
70            })?;
71            errors.push(error_info);
72        }
73
74        trace!("Got {} errors for run {}", errors.len(), run_id);
75        Ok(errors)
76    }
77}