1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
//! Redis source configuration.
use faucet_core::DEFAULT_BATCH_SIZE;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
/// The type of Redis data structure to read from.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(tag = "type")]
pub enum RedisSourceType {
/// Read all elements from a Redis list via `LRANGE`.
List {
/// The list key.
key: String,
},
/// Read entries from a Redis stream via `XRANGE` (or `XREAD` / `XREADGROUP`
/// for the consumer-group convenience path on `fetch_all`).
Stream {
/// The stream key.
key: String,
/// Optional consumer group name. When set, [`fetch_all`](super::RedisSource::fetch_all)
/// uses `XREADGROUP`. Streaming via [`stream_pages`](faucet_core::Source::stream_pages)
/// always uses `XRANGE` (consumer-group semantics are incompatible
/// with the page-then-write streaming contract).
group: Option<String>,
/// Consumer name within the group (required when `group` is set).
consumer: Option<String>,
/// Maximum number of entries to read per call. Used by
/// [`fetch_all`](super::RedisSource::fetch_all)'s `XREAD` / `XREADGROUP`
/// path; ignored by streaming, which uses
/// [`RedisSourceConfig::batch_size`] as both the per-page count and
/// the `XRANGE COUNT` hint.
count: Option<usize>,
},
/// Scan for keys matching a pattern, then `MGET` each batch.
Keys {
/// Glob pattern for `SCAN` (e.g. `"user:*"`).
pattern: String,
},
}
/// Configuration for the Redis source connector.
#[derive(Clone, Serialize, Deserialize, JsonSchema)]
pub struct RedisSourceConfig {
/// Redis connection URL (e.g. `"redis://127.0.0.1:6379"`).
pub url: String,
/// The type of Redis data structure to read from.
pub source_type: RedisSourceType,
/// Optional maximum number of records to return.
pub max_records: Option<usize>,
/// Records per emitted [`StreamPage`](faucet_core::StreamPage). Each mode
/// maps the knob onto its native paging primitive:
///
/// - `Keys`: `SCAN COUNT batch_size` hint, followed by an `MGET` batched
/// to `batch_size` records per page.
/// - `Stream`: `XRANGE - + COUNT batch_size`, advancing the start ID per
/// page.
/// - `List`: `LRANGE start stop`, sliding the window by `batch_size`.
///
/// Defaults to [`DEFAULT_BATCH_SIZE`].
///
/// `batch_size = 0` is the "no batching" sentinel: every mode drains its
/// underlying primitive in a single round-trip (or, for `Keys`, the
/// minimum number of round-trips the `SCAN` cursor needs) and emits the
/// entire result set as a single page. Useful for small lookup tables or
/// for sinks like SQL `COPY` / BigQuery load jobs that prefer one large
/// request to many small ones.
#[serde(default = "default_batch_size")]
pub batch_size: usize,
}
fn default_batch_size() -> usize {
DEFAULT_BATCH_SIZE
}
impl std::fmt::Debug for RedisSourceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisSourceConfig")
.field("url", &"***")
.field("source_type", &self.source_type)
.field("max_records", &self.max_records)
.field("batch_size", &self.batch_size)
.finish()
}
}
impl RedisSourceConfig {
/// Create a new config with the given URL and source type.
pub fn new(url: impl Into<String>, source_type: RedisSourceType) -> Self {
Self {
url: url.into(),
source_type,
max_records: None,
batch_size: DEFAULT_BATCH_SIZE,
}
}
/// Set the maximum number of records to return.
pub fn max_records(mut self, max: usize) -> Self {
self.max_records = Some(max);
self
}
/// Set the per-page record count for
/// [`Source::stream_pages`](faucet_core::Source::stream_pages).
///
/// Pass `0` to opt out of batching — the entire result set is emitted in
/// a single [`StreamPage`](faucet_core::StreamPage).
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config_list() {
let config = RedisSourceConfig::new(
"redis://127.0.0.1:6379",
RedisSourceType::List {
key: "my_list".into(),
},
);
assert!(config.max_records.is_none());
// Debug should mask URL
let debug = format!("{config:?}");
assert!(debug.contains("***"));
assert!(!debug.contains("127.0.0.1"));
}
#[test]
fn builder_methods() {
let config = RedisSourceConfig::new(
"redis://localhost",
RedisSourceType::Stream {
key: "events".into(),
group: Some("mygroup".into()),
consumer: Some("worker1".into()),
count: Some(100),
},
)
.max_records(500);
assert_eq!(config.max_records, Some(500));
}
#[test]
fn keys_source_type() {
let config = RedisSourceConfig::new(
"redis://localhost",
RedisSourceType::Keys {
pattern: "user:*".into(),
},
);
if let RedisSourceType::Keys { ref pattern } = config.source_type {
assert_eq!(pattern, "user:*");
} else {
panic!("expected Keys variant");
}
}
#[test]
fn batch_size_defaults_to_default_batch_size() {
let config = RedisSourceConfig::new(
"redis://localhost",
RedisSourceType::List { key: "k".into() },
);
assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
}
#[test]
fn with_batch_size_overrides_default() {
let config = RedisSourceConfig::new(
"redis://localhost",
RedisSourceType::List { key: "k".into() },
)
.with_batch_size(500);
assert_eq!(config.batch_size, 500);
}
#[test]
fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
let config = RedisSourceConfig::new(
"redis://localhost",
RedisSourceType::List { key: "k".into() },
)
.with_batch_size(0);
assert_eq!(config.batch_size, 0);
assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
}
#[test]
fn batch_size_above_max_is_rejected_by_validate_batch_size() {
let config = RedisSourceConfig::new(
"redis://localhost",
RedisSourceType::List { key: "k".into() },
)
.with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
}
#[test]
fn batch_size_deserializes_from_json() {
let json = r#"{
"url": "redis://localhost",
"source_type": { "type": "List", "key": "items" },
"batch_size": 250
}"#;
let config: RedisSourceConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.batch_size, 250);
}
#[test]
fn batch_size_field_omitted_defaults_to_default_batch_size() {
let json = r#"{
"url": "redis://localhost",
"source_type": { "type": "List", "key": "items" }
}"#;
let config: RedisSourceConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
}
}