floxide_redis/
metrics_store.rs

1//! Redis implementation of the MetricsStore trait.
2
3use crate::client::RedisClient;
4use async_trait::async_trait;
5use floxide_core::distributed::{MetricsError, MetricsStore, RunMetrics};
6use redis::AsyncCommands;
7use tracing::{error, instrument, trace};
8
9/// Redis implementation of the MetricsStore trait.
10#[derive(Clone)]
11pub struct RedisMetricsStore {
12    client: RedisClient,
13}
14
15impl RedisMetricsStore {
16    /// Create a new Redis metrics store with the given client.
17    pub fn new(client: RedisClient) -> Self {
18        Self { client }
19    }
20
21    /// Get the Redis key for metrics for a specific run.
22    fn metrics_key(&self, run_id: &str) -> String {
23        self.client.prefixed_key(&format!("metrics:{}", run_id))
24    }
25}
26
27#[async_trait]
28impl MetricsStore for RedisMetricsStore {
29    #[instrument(skip(self, metrics), level = "trace")]
30    async fn update_metrics(&self, run_id: &str, metrics: RunMetrics) -> Result<(), MetricsError> {
31        let key = self.metrics_key(run_id);
32
33        // Serialize the metrics
34        let serialized = serde_json::to_string(&metrics).map_err(|e| {
35            error!("Failed to serialize metrics: {}", e);
36            MetricsError::Other(format!("Serialization error: {}", e))
37        })?;
38
39        // Store the serialized metrics in Redis
40        let mut conn = self.client.conn.clone();
41        let _result: () = conn.set(&key, serialized).await.map_err(|e| {
42            error!("Redis error while updating metrics: {}", e);
43            MetricsError::Other(format!("Redis error: {}", e))
44        })?;
45
46        trace!("Updated metrics for run {}", run_id);
47        Ok(())
48    }
49
50    #[instrument(skip(self), level = "trace")]
51    async fn get_metrics(&self, run_id: &str) -> Result<Option<RunMetrics>, MetricsError> {
52        let key = self.metrics_key(run_id);
53        let mut conn = self.client.conn.clone();
54
55        // Get the serialized metrics from Redis
56        let result: Option<String> = conn.get(&key).await.map_err(|e| {
57            error!("Redis error while getting metrics: {}", e);
58            MetricsError::Other(format!("Redis error: {}", e))
59        })?;
60
61        // If the metrics exist, deserialize them
62        if let Some(serialized) = result {
63            let metrics = serde_json::from_str(&serialized).map_err(|e| {
64                error!("Failed to deserialize metrics: {}", e);
65                MetricsError::Other(format!("Deserialization error: {}", e))
66            })?;
67
68            trace!("Got metrics for run {}", run_id);
69            Ok(Some(metrics))
70        } else {
71            trace!("No metrics found for run {}", run_id);
72            Ok(None)
73        }
74    }
75}