Skip to main content

faucet_source_redis/
config.rs

1//! Redis source configuration.
2
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5
6/// The type of Redis data structure to read from.
7#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
8#[serde(tag = "type")]
9pub enum RedisSourceType {
10    /// Read all elements from a Redis list via `LRANGE 0 -1`.
11    List {
12        /// The list key.
13        key: String,
14    },
15    /// Read entries from a Redis stream via `XREAD` or `XREADGROUP`.
16    Stream {
17        /// The stream key.
18        key: String,
19        /// Optional consumer group name. When set, uses `XREADGROUP`.
20        group: Option<String>,
21        /// Consumer name within the group (required when `group` is set).
22        consumer: Option<String>,
23        /// Maximum number of entries to read per call.
24        count: Option<usize>,
25    },
26    /// Scan for keys matching a pattern, then `GET` each key.
27    Keys {
28        /// Glob pattern for `SCAN` (e.g. `"user:*"`).
29        pattern: String,
30    },
31}
32
33/// Configuration for the Redis source connector.
34#[derive(Clone, Serialize, Deserialize, JsonSchema)]
35pub struct RedisSourceConfig {
36    /// Redis connection URL (e.g. `"redis://127.0.0.1:6379"`).
37    pub url: String,
38    /// The type of Redis data structure to read from.
39    pub source_type: RedisSourceType,
40    /// Optional maximum number of records to return.
41    pub max_records: Option<usize>,
42}
43
44impl std::fmt::Debug for RedisSourceConfig {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        f.debug_struct("RedisSourceConfig")
47            .field("url", &"***")
48            .field("source_type", &self.source_type)
49            .field("max_records", &self.max_records)
50            .finish()
51    }
52}
53
54impl RedisSourceConfig {
55    /// Create a new config with the given URL and source type.
56    pub fn new(url: impl Into<String>, source_type: RedisSourceType) -> Self {
57        Self {
58            url: url.into(),
59            source_type,
60            max_records: None,
61        }
62    }
63
64    /// Set the maximum number of records to return.
65    pub fn max_records(mut self, max: usize) -> Self {
66        self.max_records = Some(max);
67        self
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use super::*;
74
75    #[test]
76    fn default_config_list() {
77        let config = RedisSourceConfig::new(
78            "redis://127.0.0.1:6379",
79            RedisSourceType::List {
80                key: "my_list".into(),
81            },
82        );
83        assert!(config.max_records.is_none());
84        // Debug should mask URL
85        let debug = format!("{config:?}");
86        assert!(debug.contains("***"));
87        assert!(!debug.contains("127.0.0.1"));
88    }
89
90    #[test]
91    fn builder_methods() {
92        let config = RedisSourceConfig::new(
93            "redis://localhost",
94            RedisSourceType::Stream {
95                key: "events".into(),
96                group: Some("mygroup".into()),
97                consumer: Some("worker1".into()),
98                count: Some(100),
99            },
100        )
101        .max_records(500);
102        assert_eq!(config.max_records, Some(500));
103    }
104
105    #[test]
106    fn keys_source_type() {
107        let config = RedisSourceConfig::new(
108            "redis://localhost",
109            RedisSourceType::Keys {
110                pattern: "user:*".into(),
111            },
112        );
113        if let RedisSourceType::Keys { ref pattern } = config.source_type {
114            assert_eq!(pattern, "user:*");
115        } else {
116            panic!("expected Keys variant");
117        }
118    }
119}