Skip to main content

faucet_state_redis/
store.rs

1//! Redis-backed [`StateStore`].
2
3use async_trait::async_trait;
4use faucet_core::state::{DOCTOR_SENTINEL_KEY, StateStore, validate_state_key};
5use faucet_core::{FaucetError, Value};
6use redis::AsyncCommands;
7
8/// A `StateStore` that persists each entry as a single Redis string under
9/// `{namespace}:{key}`.
10///
11/// The connection is opened in [`RedisStateStore::connect`] and reused across
12/// all calls — `redis::aio::MultiplexedConnection` is cheaply cloneable.
13pub struct RedisStateStore {
14    namespace: String,
15    conn: redis::aio::MultiplexedConnection,
16}
17
18impl RedisStateStore {
19    /// Connect to a Redis server and namespace all keys under `namespace:`.
20    ///
21    /// `url` follows the standard Redis URL form (`redis://[:pass@]host:port/db`).
22    pub async fn connect(
23        url: impl AsRef<str>,
24        namespace: impl Into<String>,
25    ) -> Result<Self, FaucetError> {
26        let namespace = namespace.into();
27        validate_namespace(&namespace)?;
28        let client = redis::Client::open(url.as_ref())
29            .map_err(|e| FaucetError::Config(format!("invalid Redis URL: {e}")))?;
30        let conn = client
31            .get_multiplexed_async_connection()
32            .await
33            .map_err(|e| FaucetError::State(format!("Redis connection failed: {e}")))?;
34        Ok(Self { namespace, conn })
35    }
36
37    /// Construct from an existing async connection. Useful for tests and for
38    /// integrators who want to share a connection across multiple stores.
39    pub fn from_connection(
40        conn: redis::aio::MultiplexedConnection,
41        namespace: impl Into<String>,
42    ) -> Result<Self, FaucetError> {
43        let namespace = namespace.into();
44        validate_namespace(&namespace)?;
45        Ok(Self { namespace, conn })
46    }
47
48    /// Returns the fully-qualified Redis key for a given state key.
49    pub fn redis_key(&self, key: &str) -> String {
50        build_redis_key(&self.namespace, key)
51    }
52}
53
54/// Format the namespaced Redis key. Exposed as a free function so it can be
55/// unit-tested without constructing a `RedisStateStore` (which needs a real
56/// Redis connection).
57pub(crate) fn build_redis_key(namespace: &str, key: &str) -> String {
58    format!("{namespace}:{key}")
59}
60
61pub(crate) fn validate_namespace(namespace: &str) -> Result<(), FaucetError> {
62    if namespace.is_empty() {
63        return Err(FaucetError::Config(
64            "Redis state namespace must not be empty".into(),
65        ));
66    }
67    for (i, c) in namespace.char_indices() {
68        let ok = c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.');
69        if !ok {
70            return Err(FaucetError::Config(format!(
71                "Redis state namespace contains illegal character {c:?} at byte {i}"
72            )));
73        }
74    }
75    Ok(())
76}
77
78#[async_trait]
79impl StateStore for RedisStateStore {
80    async fn get(&self, key: &str) -> Result<Option<Value>, FaucetError> {
81        validate_state_key(key)?;
82        let mut conn = self.conn.clone();
83        let raw: Option<String> = conn
84            .get(self.redis_key(key))
85            .await
86            .map_err(|e| FaucetError::State(format!("Redis GET for key '{key}' failed: {e}")))?;
87        match raw {
88            None => Ok(None),
89            Some(s) => {
90                let value: Value = serde_json::from_str(&s).map_err(|e| {
91                    FaucetError::State(format!(
92                        "stored value for key '{key}' is not valid JSON: {e}"
93                    ))
94                })?;
95                Ok(Some(value))
96            }
97        }
98    }
99
100    async fn put(&self, key: &str, value: &Value) -> Result<(), FaucetError> {
101        validate_state_key(key)?;
102        let serialized = serde_json::to_string(value).map_err(|e| {
103            FaucetError::State(format!("failed to serialize state for key '{key}': {e}"))
104        })?;
105        let mut conn = self.conn.clone();
106        let _: () = conn
107            .set(self.redis_key(key), serialized)
108            .await
109            .map_err(|e| FaucetError::State(format!("Redis SET for key '{key}' failed: {e}")))?;
110        tracing::debug!(key, namespace = %self.namespace, "state written to Redis");
111        Ok(())
112    }
113
114    async fn delete(&self, key: &str) -> Result<(), FaucetError> {
115        validate_state_key(key)?;
116        let mut conn = self.conn.clone();
117        let _: i64 = conn
118            .del(self.redis_key(key))
119            .await
120            .map_err(|e| FaucetError::State(format!("Redis DEL for key '{key}' failed: {e}")))?;
121        Ok(())
122    }
123
124    async fn check(
125        &self,
126        ctx: &faucet_core::check::CheckContext,
127    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
128        use faucet_core::check::{CheckReport, Probe};
129
130        // Exercise the real put → get → delete cycle on a sentinel key. This
131        // validates connectivity, auth, and read/write permissions through the
132        // actual code path and leaves no residue.
133        let start = std::time::Instant::now();
134        let probe = match tokio::time::timeout(ctx.timeout, self.sentinel_roundtrip()).await {
135            Ok(Ok(())) => Probe::pass("sentinel", start.elapsed()),
136            Ok(Err(e)) => Probe::fail_hint(
137                "sentinel",
138                start.elapsed(),
139                e.to_string(),
140                "verify the Redis server is reachable and the credentials grant read/write access",
141            ),
142            Err(_) => Probe::fail_hint(
143                "sentinel",
144                start.elapsed(),
145                format!(
146                    "round-trip timed out after {:?}; Redis did not respond",
147                    ctx.timeout
148                ),
149                "verify the Redis server is reachable or raise the check timeout",
150            ),
151        };
152        Ok(CheckReport::single(probe))
153    }
154}
155
156impl RedisStateStore {
157    /// Write, read back, and delete a sentinel key — the body of the `check()`
158    /// probe, factored out so the happy path stays linear. Reuses the store's
159    /// own `put`/`get`/`delete`, which already namespace the key.
160    async fn sentinel_roundtrip(&self) -> Result<(), FaucetError> {
161        let probe = serde_json::json!({ "faucet_doctor": true });
162        self.put(DOCTOR_SENTINEL_KEY, &probe).await?;
163        let got = self.get(DOCTOR_SENTINEL_KEY).await?;
164        // Best-effort cleanup regardless of the read result.
165        let _ = self.delete(DOCTOR_SENTINEL_KEY).await;
166        match got {
167            Some(v) if v == probe => Ok(()),
168            _ => Err(FaucetError::State(
169                "sentinel readback did not match what was written".into(),
170            )),
171        }
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178
179    #[test]
180    fn build_redis_key_namespaces_consistently() {
181        assert_eq!(
182            build_redis_key("faucet", "github_issues"),
183            "faucet:github_issues"
184        );
185        assert_eq!(build_redis_key("a", "b"), "a:b");
186    }
187
188    #[test]
189    fn validate_namespace_accepts_typical_values() {
190        for ns in ["faucet", "team-1.prod", "a_b", "ABC.123"] {
191            validate_namespace(ns).unwrap_or_else(|e| panic!("expected ok for {ns:?}: {e}"));
192        }
193    }
194
195    #[test]
196    fn validate_namespace_rejects_empty() {
197        let err = validate_namespace("").unwrap_err();
198        assert!(matches!(err, FaucetError::Config(_)));
199    }
200
201    #[test]
202    fn validate_namespace_rejects_illegal_chars() {
203        for ns in ["a:b", "a/b", "a b", "hello world"] {
204            let err = validate_namespace(ns).expect_err(&format!("expected error for {ns:?}"));
205            assert!(matches!(err, FaucetError::Config(_)));
206        }
207    }
208
209    #[tokio::test]
210    async fn connect_rejects_invalid_url() {
211        let result = RedisStateStore::connect("not a url", "faucet").await;
212        match result {
213            Err(FaucetError::Config(msg)) => assert!(msg.contains("invalid Redis URL")),
214            Err(other) => panic!("expected Config error, got {other:?}"),
215            Ok(_) => panic!("expected error, got Ok"),
216        }
217    }
218
219    #[tokio::test]
220    async fn connect_rejects_invalid_namespace() {
221        let result = RedisStateStore::connect("redis://127.0.0.1:6379", "bad:namespace").await;
222        match result {
223            Err(FaucetError::Config(_)) => {}
224            Err(other) => panic!("expected Config error, got {other:?}"),
225            Ok(_) => panic!("expected error, got Ok"),
226        }
227    }
228}