faucet_state_redis/
store.rs1use async_trait::async_trait;
4use faucet_core::state::{DOCTOR_SENTINEL_KEY, StateStore, validate_state_key};
5use faucet_core::{FaucetError, Value};
6use redis::AsyncCommands;
7
8pub struct RedisStateStore {
14 namespace: String,
15 conn: redis::aio::MultiplexedConnection,
16}
17
18impl RedisStateStore {
19 pub async fn connect(
23 url: impl AsRef<str>,
24 namespace: impl Into<String>,
25 ) -> Result<Self, FaucetError> {
26 let namespace = namespace.into();
27 validate_namespace(&namespace)?;
28 let client = redis::Client::open(url.as_ref())
29 .map_err(|e| FaucetError::Config(format!("invalid Redis URL: {e}")))?;
30 let conn = client
31 .get_multiplexed_async_connection()
32 .await
33 .map_err(|e| FaucetError::State(format!("Redis connection failed: {e}")))?;
34 Ok(Self { namespace, conn })
35 }
36
37 pub fn from_connection(
40 conn: redis::aio::MultiplexedConnection,
41 namespace: impl Into<String>,
42 ) -> Result<Self, FaucetError> {
43 let namespace = namespace.into();
44 validate_namespace(&namespace)?;
45 Ok(Self { namespace, conn })
46 }
47
48 pub fn redis_key(&self, key: &str) -> String {
50 build_redis_key(&self.namespace, key)
51 }
52}
53
54pub(crate) fn build_redis_key(namespace: &str, key: &str) -> String {
58 format!("{namespace}:{key}")
59}
60
61pub(crate) fn validate_namespace(namespace: &str) -> Result<(), FaucetError> {
62 if namespace.is_empty() {
63 return Err(FaucetError::Config(
64 "Redis state namespace must not be empty".into(),
65 ));
66 }
67 for (i, c) in namespace.char_indices() {
68 let ok = c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.');
69 if !ok {
70 return Err(FaucetError::Config(format!(
71 "Redis state namespace contains illegal character {c:?} at byte {i}"
72 )));
73 }
74 }
75 Ok(())
76}
77
78#[async_trait]
79impl StateStore for RedisStateStore {
80 async fn get(&self, key: &str) -> Result<Option<Value>, FaucetError> {
81 validate_state_key(key)?;
82 let mut conn = self.conn.clone();
83 let raw: Option<String> = conn
84 .get(self.redis_key(key))
85 .await
86 .map_err(|e| FaucetError::State(format!("Redis GET for key '{key}' failed: {e}")))?;
87 match raw {
88 None => Ok(None),
89 Some(s) => {
90 let value: Value = serde_json::from_str(&s).map_err(|e| {
91 FaucetError::State(format!(
92 "stored value for key '{key}' is not valid JSON: {e}"
93 ))
94 })?;
95 Ok(Some(value))
96 }
97 }
98 }
99
100 async fn put(&self, key: &str, value: &Value) -> Result<(), FaucetError> {
101 validate_state_key(key)?;
102 let serialized = serde_json::to_string(value).map_err(|e| {
103 FaucetError::State(format!("failed to serialize state for key '{key}': {e}"))
104 })?;
105 let mut conn = self.conn.clone();
106 let _: () = conn
107 .set(self.redis_key(key), serialized)
108 .await
109 .map_err(|e| FaucetError::State(format!("Redis SET for key '{key}' failed: {e}")))?;
110 tracing::debug!(key, namespace = %self.namespace, "state written to Redis");
111 Ok(())
112 }
113
114 async fn delete(&self, key: &str) -> Result<(), FaucetError> {
115 validate_state_key(key)?;
116 let mut conn = self.conn.clone();
117 let _: i64 = conn
118 .del(self.redis_key(key))
119 .await
120 .map_err(|e| FaucetError::State(format!("Redis DEL for key '{key}' failed: {e}")))?;
121 Ok(())
122 }
123
124 async fn check(
125 &self,
126 ctx: &faucet_core::check::CheckContext,
127 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
128 use faucet_core::check::{CheckReport, Probe};
129
130 let start = std::time::Instant::now();
134 let probe = match tokio::time::timeout(ctx.timeout, self.sentinel_roundtrip()).await {
135 Ok(Ok(())) => Probe::pass("sentinel", start.elapsed()),
136 Ok(Err(e)) => Probe::fail_hint(
137 "sentinel",
138 start.elapsed(),
139 e.to_string(),
140 "verify the Redis server is reachable and the credentials grant read/write access",
141 ),
142 Err(_) => Probe::fail_hint(
143 "sentinel",
144 start.elapsed(),
145 format!(
146 "round-trip timed out after {:?}; Redis did not respond",
147 ctx.timeout
148 ),
149 "verify the Redis server is reachable or raise the check timeout",
150 ),
151 };
152 Ok(CheckReport::single(probe))
153 }
154}
155
156impl RedisStateStore {
157 async fn sentinel_roundtrip(&self) -> Result<(), FaucetError> {
161 let probe = serde_json::json!({ "faucet_doctor": true });
162 self.put(DOCTOR_SENTINEL_KEY, &probe).await?;
163 let got = self.get(DOCTOR_SENTINEL_KEY).await?;
164 let _ = self.delete(DOCTOR_SENTINEL_KEY).await;
166 match got {
167 Some(v) if v == probe => Ok(()),
168 _ => Err(FaucetError::State(
169 "sentinel readback did not match what was written".into(),
170 )),
171 }
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178
179 #[test]
180 fn build_redis_key_namespaces_consistently() {
181 assert_eq!(
182 build_redis_key("faucet", "github_issues"),
183 "faucet:github_issues"
184 );
185 assert_eq!(build_redis_key("a", "b"), "a:b");
186 }
187
188 #[test]
189 fn validate_namespace_accepts_typical_values() {
190 for ns in ["faucet", "team-1.prod", "a_b", "ABC.123"] {
191 validate_namespace(ns).unwrap_or_else(|e| panic!("expected ok for {ns:?}: {e}"));
192 }
193 }
194
195 #[test]
196 fn validate_namespace_rejects_empty() {
197 let err = validate_namespace("").unwrap_err();
198 assert!(matches!(err, FaucetError::Config(_)));
199 }
200
201 #[test]
202 fn validate_namespace_rejects_illegal_chars() {
203 for ns in ["a:b", "a/b", "a b", "hello world"] {
204 let err = validate_namespace(ns).expect_err(&format!("expected error for {ns:?}"));
205 assert!(matches!(err, FaucetError::Config(_)));
206 }
207 }
208
209 #[tokio::test]
210 async fn connect_rejects_invalid_url() {
211 let result = RedisStateStore::connect("not a url", "faucet").await;
212 match result {
213 Err(FaucetError::Config(msg)) => assert!(msg.contains("invalid Redis URL")),
214 Err(other) => panic!("expected Config error, got {other:?}"),
215 Ok(_) => panic!("expected error, got Ok"),
216 }
217 }
218
219 #[tokio::test]
220 async fn connect_rejects_invalid_namespace() {
221 let result = RedisStateStore::connect("redis://127.0.0.1:6379", "bad:namespace").await;
222 match result {
223 Err(FaucetError::Config(_)) => {}
224 Err(other) => panic!("expected Config error, got {other:?}"),
225 Ok(_) => panic!("expected error, got Ok"),
226 }
227 }
228}