Skip to main content

faucet_source_redis/
config.rs

1//! Redis source configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7/// The type of Redis data structure to read from.
8#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9#[serde(tag = "type")]
10pub enum RedisSourceType {
11    /// Read all elements from a Redis list via `LRANGE`.
12    List {
13        /// The list key.
14        key: String,
15    },
16    /// Read entries from a Redis stream via `XRANGE` (or `XREAD` / `XREADGROUP`
17    /// for the consumer-group convenience path on `fetch_all`).
18    Stream {
19        /// The stream key.
20        key: String,
21        /// Optional consumer group name. When set, [`fetch_all`](super::RedisSource::fetch_all)
22        /// uses `XREADGROUP`. Streaming via [`stream_pages`](faucet_core::Source::stream_pages)
23        /// always uses `XRANGE` (consumer-group semantics are incompatible
24        /// with the page-then-write streaming contract).
25        group: Option<String>,
26        /// Consumer name within the group (required when `group` is set).
27        consumer: Option<String>,
28        /// Maximum number of entries to read per call. Used by
29        /// [`fetch_all`](super::RedisSource::fetch_all)'s `XREAD` / `XREADGROUP`
30        /// path; ignored by streaming, which uses
31        /// [`RedisSourceConfig::batch_size`] as both the per-page count and
32        /// the `XRANGE COUNT` hint.
33        count: Option<usize>,
34    },
35    /// Scan for keys matching a pattern, then `MGET` each batch.
36    Keys {
37        /// Glob pattern for `SCAN` (e.g. `"user:*"`).
38        pattern: String,
39    },
40}
41
42/// Configuration for the Redis source connector.
43#[derive(Clone, Serialize, Deserialize, JsonSchema)]
44pub struct RedisSourceConfig {
45    /// Redis connection URL (e.g. `"redis://127.0.0.1:6379"`).
46    pub url: String,
47    /// The type of Redis data structure to read from.
48    pub source_type: RedisSourceType,
49    /// Optional maximum number of records to return.
50    pub max_records: Option<usize>,
51    /// Records per emitted [`StreamPage`](faucet_core::StreamPage). Each mode
52    /// maps the knob onto its native paging primitive:
53    ///
54    /// - `Keys`: `SCAN COUNT batch_size` hint, followed by an `MGET` batched
55    ///   to `batch_size` records per page.
56    /// - `Stream`: `XRANGE - + COUNT batch_size`, advancing the start ID per
57    ///   page.
58    /// - `List`: `LRANGE start stop`, sliding the window by `batch_size`.
59    ///
60    /// Defaults to [`DEFAULT_BATCH_SIZE`].
61    ///
62    /// `batch_size = 0` is the "no batching" sentinel: every mode drains its
63    /// underlying primitive in a single round-trip (or, for `Keys`, the
64    /// minimum number of round-trips the `SCAN` cursor needs) and emits the
65    /// entire result set as a single page. Useful for small lookup tables or
66    /// for sinks like SQL `COPY` / BigQuery load jobs that prefer one large
67    /// request to many small ones.
68    #[serde(default = "default_batch_size")]
69    pub batch_size: usize,
70}
71
72fn default_batch_size() -> usize {
73    DEFAULT_BATCH_SIZE
74}
75
76impl std::fmt::Debug for RedisSourceConfig {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("RedisSourceConfig")
79            .field("url", &"***")
80            .field("source_type", &self.source_type)
81            .field("max_records", &self.max_records)
82            .field("batch_size", &self.batch_size)
83            .finish()
84    }
85}
86
87impl RedisSourceConfig {
88    /// Create a new config with the given URL and source type.
89    pub fn new(url: impl Into<String>, source_type: RedisSourceType) -> Self {
90        Self {
91            url: url.into(),
92            source_type,
93            max_records: None,
94            batch_size: DEFAULT_BATCH_SIZE,
95        }
96    }
97
98    /// Set the maximum number of records to return.
99    pub fn max_records(mut self, max: usize) -> Self {
100        self.max_records = Some(max);
101        self
102    }
103
104    /// Set the per-page record count for
105    /// [`Source::stream_pages`](faucet_core::Source::stream_pages).
106    ///
107    /// Pass `0` to opt out of batching — the entire result set is emitted in
108    /// a single [`StreamPage`](faucet_core::StreamPage).
109    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
110        self.batch_size = batch_size;
111        self
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118
119    #[test]
120    fn default_config_list() {
121        let config = RedisSourceConfig::new(
122            "redis://127.0.0.1:6379",
123            RedisSourceType::List {
124                key: "my_list".into(),
125            },
126        );
127        assert!(config.max_records.is_none());
128        // Debug should mask URL
129        let debug = format!("{config:?}");
130        assert!(debug.contains("***"));
131        assert!(!debug.contains("127.0.0.1"));
132    }
133
134    #[test]
135    fn builder_methods() {
136        let config = RedisSourceConfig::new(
137            "redis://localhost",
138            RedisSourceType::Stream {
139                key: "events".into(),
140                group: Some("mygroup".into()),
141                consumer: Some("worker1".into()),
142                count: Some(100),
143            },
144        )
145        .max_records(500);
146        assert_eq!(config.max_records, Some(500));
147    }
148
149    #[test]
150    fn keys_source_type() {
151        let config = RedisSourceConfig::new(
152            "redis://localhost",
153            RedisSourceType::Keys {
154                pattern: "user:*".into(),
155            },
156        );
157        if let RedisSourceType::Keys { ref pattern } = config.source_type {
158            assert_eq!(pattern, "user:*");
159        } else {
160            panic!("expected Keys variant");
161        }
162    }
163
164    #[test]
165    fn batch_size_defaults_to_default_batch_size() {
166        let config = RedisSourceConfig::new(
167            "redis://localhost",
168            RedisSourceType::List { key: "k".into() },
169        );
170        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
171    }
172
173    #[test]
174    fn with_batch_size_overrides_default() {
175        let config = RedisSourceConfig::new(
176            "redis://localhost",
177            RedisSourceType::List { key: "k".into() },
178        )
179        .with_batch_size(500);
180        assert_eq!(config.batch_size, 500);
181    }
182
183    #[test]
184    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
185        let config = RedisSourceConfig::new(
186            "redis://localhost",
187            RedisSourceType::List { key: "k".into() },
188        )
189        .with_batch_size(0);
190        assert_eq!(config.batch_size, 0);
191        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
192    }
193
194    #[test]
195    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
196        let config = RedisSourceConfig::new(
197            "redis://localhost",
198            RedisSourceType::List { key: "k".into() },
199        )
200        .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
201        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
202    }
203
204    #[test]
205    fn batch_size_deserializes_from_json() {
206        let json = r#"{
207            "url": "redis://localhost",
208            "source_type": { "type": "List", "key": "items" },
209            "batch_size": 250
210        }"#;
211        let config: RedisSourceConfig = serde_json::from_str(json).unwrap();
212        assert_eq!(config.batch_size, 250);
213    }
214
215    #[test]
216    fn batch_size_field_omitted_defaults_to_default_batch_size() {
217        let json = r#"{
218            "url": "redis://localhost",
219            "source_type": { "type": "List", "key": "items" }
220        }"#;
221        let config: RedisSourceConfig = serde_json::from_str(json).unwrap();
222        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
223    }
224}