faucet_source_redis/
stream.rs1use crate::config::{RedisSourceConfig, RedisSourceType};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use redis::AsyncCommands;
7use serde_json::{Value, json};
8
9pub struct RedisSource {
11 config: RedisSourceConfig,
12}
13
14impl RedisSource {
15 pub fn new(config: RedisSourceConfig) -> Self {
17 Self { config }
18 }
19
20 pub async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
22 let client = redis::Client::open(self.config.url.as_str())
23 .map_err(|e| FaucetError::Config(format!("invalid Redis URL: {e}")))?;
24
25 let mut conn = client
26 .get_multiplexed_async_connection()
27 .await
28 .map_err(|e| FaucetError::Config(format!("Redis connection failed: {e}")))?;
29
30 let mut records = match &self.config.source_type {
31 RedisSourceType::List { key } => self.fetch_list(&mut conn, key).await?,
32 RedisSourceType::Stream {
33 key,
34 group,
35 consumer,
36 count,
37 } => {
38 self.fetch_stream(&mut conn, key, group, consumer, count)
39 .await?
40 }
41 RedisSourceType::Keys { pattern } => self.fetch_keys(&mut conn, pattern).await?,
42 };
43
44 if let Some(max) = self.config.max_records {
45 records.truncate(max);
46 }
47
48 tracing::info!(records = records.len(), "Redis fetch complete");
49 Ok(records)
50 }
51
52 async fn fetch_list(
54 &self,
55 conn: &mut redis::aio::MultiplexedConnection,
56 key: &str,
57 ) -> Result<Vec<Value>, FaucetError> {
58 let values: Vec<String> = conn
59 .lrange(key, 0, -1)
60 .await
61 .map_err(|e| FaucetError::Config(format!("LRANGE failed on '{key}': {e}")))?;
62
63 let records = values
64 .into_iter()
65 .map(|v| serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone())))
66 .collect();
67
68 Ok(records)
69 }
70
71 async fn fetch_stream(
73 &self,
74 conn: &mut redis::aio::MultiplexedConnection,
75 key: &str,
76 group: &Option<String>,
77 consumer: &Option<String>,
78 count: &Option<usize>,
79 ) -> Result<Vec<Value>, FaucetError> {
80 let entries: redis::streams::StreamReadReply = match (group, consumer) {
81 (Some(group_name), Some(consumer_name)) => {
82 let opts = redis::streams::StreamReadOptions::default().count(count.unwrap_or(100));
83 conn.xread_options(&[key], &[">"], &opts.group(group_name, consumer_name))
84 .await
85 .map_err(|e| {
86 FaucetError::Config(format!("XREADGROUP failed on '{key}': {e}"))
87 })?
88 }
89 _ => {
90 let mut opts = redis::streams::StreamReadOptions::default();
91 if let Some(c) = count {
92 opts = opts.count(*c);
93 }
94 conn.xread_options(&[key], &["0"], &opts)
95 .await
96 .map_err(|e| FaucetError::Config(format!("XREAD failed on '{key}': {e}")))?
97 }
98 };
99
100 let mut records = Vec::new();
101 for stream_key in &entries.keys {
102 for entry in &stream_key.ids {
103 let mut fields = serde_json::Map::new();
104 for (field_name, field_value) in &entry.map {
105 let val = match field_value {
106 redis::Value::BulkString(bytes) => {
107 let s = String::from_utf8_lossy(bytes);
108 serde_json::from_str::<Value>(&s)
109 .unwrap_or_else(|_| Value::String(s.into_owned()))
110 }
111 redis::Value::SimpleString(s) => serde_json::from_str::<Value>(s)
112 .unwrap_or_else(|_| Value::String(s.clone())),
113 redis::Value::Int(n) => json!(n),
114 redis::Value::Double(n) => json!(n),
115 redis::Value::Boolean(b) => json!(b),
116 redis::Value::Nil => Value::Null,
117 other => Value::String(format!("{other:?}")),
118 };
119 fields.insert(field_name.clone(), val);
120 }
121 records.push(json!({
122 "id": entry.id,
123 "fields": Value::Object(fields),
124 }));
125 }
126 }
127
128 Ok(records)
129 }
130
131 async fn fetch_keys(
133 &self,
134 conn: &mut redis::aio::MultiplexedConnection,
135 pattern: &str,
136 ) -> Result<Vec<Value>, FaucetError> {
137 let keys: Vec<String> = {
138 let mut collected = Vec::new();
139 let mut iter: redis::AsyncIter<String> =
140 conn.scan_match(pattern).await.map_err(|e| {
141 FaucetError::Config(format!("SCAN failed with pattern '{pattern}': {e}"))
142 })?;
143
144 while let Some(key) = iter.next_item().await {
145 collected.push(key);
146 }
147 collected
148 };
149
150 if keys.is_empty() {
151 return Ok(Vec::new());
152 }
153
154 let values: Vec<Option<String>> = redis::cmd("MGET")
155 .arg(&keys)
156 .query_async(conn)
157 .await
158 .map_err(|e| FaucetError::Config(format!("MGET failed: {e}")))?;
159
160 let mut records = Vec::new();
161 for (key, value) in keys.iter().zip(values.into_iter()) {
162 if let Some(v) = value {
163 let parsed =
164 serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone()));
165 records.push(json!({
166 "key": key,
167 "value": parsed,
168 }));
169 }
170 }
171
172 Ok(records)
173 }
174}
175
176#[async_trait]
177impl faucet_core::Source for RedisSource {
178 async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
179 RedisSource::fetch_all(self).await
180 }
181
182 fn config_schema(&self) -> serde_json::Value {
183 serde_json::to_value(faucet_core::schema_for!(RedisSourceConfig))
184 .expect("schema serialization")
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use crate::config::RedisSourceConfig;
192
193 #[test]
194 fn creates_source() {
195 let config = RedisSourceConfig::new(
196 "redis://localhost",
197 RedisSourceType::List { key: "test".into() },
198 );
199 let _source = RedisSource::new(config);
200 }
201}