Skip to main content

faucet_source_redis/
stream.rs

1//! Redis source stream executor.
2
3use crate::config::{RedisSourceConfig, RedisSourceType};
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use redis::AsyncCommands;
7use serde_json::{Value, json};
8
9/// A configured Redis source that reads records from Redis data structures.
10pub struct RedisSource {
11    config: RedisSourceConfig,
12}
13
14impl RedisSource {
15    /// Create a new Redis source from the given configuration.
16    pub fn new(config: RedisSourceConfig) -> Self {
17        Self { config }
18    }
19
20    /// Fetch all records from the configured Redis source.
21    pub async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
22        let client = redis::Client::open(self.config.url.as_str())
23            .map_err(|e| FaucetError::Config(format!("invalid Redis URL: {e}")))?;
24
25        let mut conn = client
26            .get_multiplexed_async_connection()
27            .await
28            .map_err(|e| FaucetError::Config(format!("Redis connection failed: {e}")))?;
29
30        let mut records = match &self.config.source_type {
31            RedisSourceType::List { key } => self.fetch_list(&mut conn, key).await?,
32            RedisSourceType::Stream {
33                key,
34                group,
35                consumer,
36                count,
37            } => {
38                self.fetch_stream(&mut conn, key, group, consumer, count)
39                    .await?
40            }
41            RedisSourceType::Keys { pattern } => self.fetch_keys(&mut conn, pattern).await?,
42        };
43
44        if let Some(max) = self.config.max_records {
45            records.truncate(max);
46        }
47
48        tracing::info!(records = records.len(), "Redis fetch complete");
49        Ok(records)
50    }
51
52    /// Read all elements from a Redis list.
53    async fn fetch_list(
54        &self,
55        conn: &mut redis::aio::MultiplexedConnection,
56        key: &str,
57    ) -> Result<Vec<Value>, FaucetError> {
58        let values: Vec<String> = conn
59            .lrange(key, 0, -1)
60            .await
61            .map_err(|e| FaucetError::Config(format!("LRANGE failed on '{key}': {e}")))?;
62
63        let records = values
64            .into_iter()
65            .map(|v| serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone())))
66            .collect();
67
68        Ok(records)
69    }
70
71    /// Read entries from a Redis stream.
72    async fn fetch_stream(
73        &self,
74        conn: &mut redis::aio::MultiplexedConnection,
75        key: &str,
76        group: &Option<String>,
77        consumer: &Option<String>,
78        count: &Option<usize>,
79    ) -> Result<Vec<Value>, FaucetError> {
80        let entries: redis::streams::StreamReadReply = match (group, consumer) {
81            (Some(group_name), Some(consumer_name)) => {
82                let opts = redis::streams::StreamReadOptions::default().count(count.unwrap_or(100));
83                conn.xread_options(&[key], &[">"], &opts.group(group_name, consumer_name))
84                    .await
85                    .map_err(|e| {
86                        FaucetError::Config(format!("XREADGROUP failed on '{key}': {e}"))
87                    })?
88            }
89            _ => {
90                let mut opts = redis::streams::StreamReadOptions::default();
91                if let Some(c) = count {
92                    opts = opts.count(*c);
93                }
94                conn.xread_options(&[key], &["0"], &opts)
95                    .await
96                    .map_err(|e| FaucetError::Config(format!("XREAD failed on '{key}': {e}")))?
97            }
98        };
99
100        let mut records = Vec::new();
101        for stream_key in &entries.keys {
102            for entry in &stream_key.ids {
103                let mut fields = serde_json::Map::new();
104                for (field_name, field_value) in &entry.map {
105                    let val = match field_value {
106                        redis::Value::BulkString(bytes) => {
107                            let s = String::from_utf8_lossy(bytes);
108                            serde_json::from_str::<Value>(&s)
109                                .unwrap_or_else(|_| Value::String(s.into_owned()))
110                        }
111                        redis::Value::SimpleString(s) => serde_json::from_str::<Value>(s)
112                            .unwrap_or_else(|_| Value::String(s.clone())),
113                        redis::Value::Int(n) => json!(n),
114                        redis::Value::Double(n) => json!(n),
115                        redis::Value::Boolean(b) => json!(b),
116                        redis::Value::Nil => Value::Null,
117                        other => Value::String(format!("{other:?}")),
118                    };
119                    fields.insert(field_name.clone(), val);
120                }
121                records.push(json!({
122                    "id": entry.id,
123                    "fields": Value::Object(fields),
124                }));
125            }
126        }
127
128        Ok(records)
129    }
130
131    /// Scan for keys matching a pattern, then MGET all keys in a single round-trip.
132    async fn fetch_keys(
133        &self,
134        conn: &mut redis::aio::MultiplexedConnection,
135        pattern: &str,
136    ) -> Result<Vec<Value>, FaucetError> {
137        let keys: Vec<String> = {
138            let mut collected = Vec::new();
139            let mut iter: redis::AsyncIter<String> =
140                conn.scan_match(pattern).await.map_err(|e| {
141                    FaucetError::Config(format!("SCAN failed with pattern '{pattern}': {e}"))
142                })?;
143
144            while let Some(key) = iter.next_item().await {
145                collected.push(key);
146            }
147            collected
148        };
149
150        if keys.is_empty() {
151            return Ok(Vec::new());
152        }
153
154        let values: Vec<Option<String>> = redis::cmd("MGET")
155            .arg(&keys)
156            .query_async(conn)
157            .await
158            .map_err(|e| FaucetError::Config(format!("MGET failed: {e}")))?;
159
160        let mut records = Vec::new();
161        for (key, value) in keys.iter().zip(values.into_iter()) {
162            if let Some(v) = value {
163                let parsed =
164                    serde_json::from_str::<Value>(&v).unwrap_or_else(|_| Value::String(v.clone()));
165                records.push(json!({
166                    "key": key,
167                    "value": parsed,
168                }));
169            }
170        }
171
172        Ok(records)
173    }
174}
175
176#[async_trait]
177impl faucet_core::Source for RedisSource {
178    async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
179        RedisSource::fetch_all(self).await
180    }
181
182    fn config_schema(&self) -> serde_json::Value {
183        serde_json::to_value(faucet_core::schema_for!(RedisSourceConfig))
184            .expect("schema serialization")
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use crate::config::RedisSourceConfig;
192
193    #[test]
194    fn creates_source() {
195        let config = RedisSourceConfig::new(
196            "redis://localhost",
197            RedisSourceType::List { key: "test".into() },
198        );
199        let _source = RedisSource::new(config);
200    }
201}