faucet_source_mongodb/
config.rs1use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::fmt;
8
9#[derive(Clone, Serialize, Deserialize, JsonSchema)]
30pub struct MongoSourceConfig {
31 pub connection_uri: String,
33 pub database: String,
35 pub collection: String,
37 pub filter: Option<Value>,
39 pub projection: Option<Value>,
41 pub sort: Option<Value>,
43 pub limit: Option<i64>,
45 pub cursor_batch_size: Option<u32>,
50 #[serde(default = "default_batch_size")]
59 pub batch_size: usize,
60}
61
62fn default_batch_size() -> usize {
63 DEFAULT_BATCH_SIZE
64}
65
66impl MongoSourceConfig {
67 pub fn new(
69 connection_uri: impl Into<String>,
70 database: impl Into<String>,
71 collection: impl Into<String>,
72 ) -> Self {
73 Self {
74 connection_uri: connection_uri.into(),
75 database: database.into(),
76 collection: collection.into(),
77 filter: None,
78 projection: None,
79 sort: None,
80 limit: None,
81 cursor_batch_size: None,
82 batch_size: DEFAULT_BATCH_SIZE,
83 }
84 }
85
86 pub fn filter(mut self, filter: Value) -> Self {
88 self.filter = Some(filter);
89 self
90 }
91
92 pub fn projection(mut self, projection: Value) -> Self {
94 self.projection = Some(projection);
95 self
96 }
97
98 pub fn sort(mut self, sort: Value) -> Self {
100 self.sort = Some(sort);
101 self
102 }
103
104 pub fn limit(mut self, limit: i64) -> Self {
106 self.limit = Some(limit);
107 self
108 }
109
110 pub fn cursor_batch_size(mut self, cursor_batch_size: u32) -> Self {
114 self.cursor_batch_size = Some(cursor_batch_size);
115 self
116 }
117
118 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
123 self.batch_size = batch_size;
124 self
125 }
126}
127
128impl fmt::Debug for MongoSourceConfig {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 f.debug_struct("MongoSourceConfig")
131 .field("connection_uri", &"***")
132 .field("database", &self.database)
133 .field("collection", &self.collection)
134 .field("filter", &self.filter)
135 .field("projection", &self.projection)
136 .field("sort", &self.sort)
137 .field("limit", &self.limit)
138 .field("cursor_batch_size", &self.cursor_batch_size)
139 .field("batch_size", &self.batch_size)
140 .finish()
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 use serde_json::json;
148
149 #[test]
150 fn default_config() {
151 let config = MongoSourceConfig::new("mongodb://localhost:27017", "testdb", "users");
152 assert_eq!(config.database, "testdb");
153 assert_eq!(config.collection, "users");
154 assert!(config.filter.is_none());
155 assert!(config.projection.is_none());
156 assert!(config.sort.is_none());
157 assert!(config.limit.is_none());
158 assert!(config.cursor_batch_size.is_none());
159 assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
160 }
161
162 #[test]
163 fn builder_methods() {
164 let config = MongoSourceConfig::new("mongodb://localhost:27017", "testdb", "users")
165 .filter(json!({"active": true}))
166 .projection(json!({"_id": 0, "name": 1}))
167 .sort(json!({"name": 1}))
168 .limit(500)
169 .cursor_batch_size(100)
170 .with_batch_size(2000);
171
172 assert_eq!(config.filter.unwrap(), json!({"active": true}));
173 assert_eq!(config.projection.unwrap(), json!({"_id": 0, "name": 1}));
174 assert_eq!(config.sort.unwrap(), json!({"name": 1}));
175 assert_eq!(config.limit, Some(500));
176 assert_eq!(config.cursor_batch_size, Some(100));
177 assert_eq!(config.batch_size, 2000);
178 }
179
180 #[test]
181 fn debug_masks_connection_uri() {
182 let config =
183 MongoSourceConfig::new("mongodb://user:secret@host:27017/db", "testdb", "users");
184 let debug = format!("{config:?}");
185 assert!(debug.contains("***"));
186 assert!(!debug.contains("secret"));
187 }
188
189 #[test]
190 fn batch_size_defaults_to_default_batch_size() {
191 let config = MongoSourceConfig::new("mongodb://localhost:27017", "db", "c");
192 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
193 }
194
195 #[test]
196 fn with_batch_size_overrides_default() {
197 let config =
198 MongoSourceConfig::new("mongodb://localhost:27017", "db", "c").with_batch_size(500);
199 assert_eq!(config.batch_size, 500);
200 }
201
202 #[test]
203 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
204 let config =
205 MongoSourceConfig::new("mongodb://localhost:27017", "db", "c").with_batch_size(0);
206 assert_eq!(config.batch_size, 0);
207 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
208 }
209
210 #[test]
211 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
212 let config = MongoSourceConfig::new("mongodb://localhost:27017", "db", "c")
213 .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
214 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
215 }
216
217 #[test]
218 fn batch_size_deserializes_from_json() {
219 let json = r#"{
220 "connection_uri": "mongodb://localhost:27017",
221 "database": "db",
222 "collection": "c",
223 "batch_size": 250
224 }"#;
225 let config: MongoSourceConfig = serde_json::from_str(json).unwrap();
226 assert_eq!(config.batch_size, 250);
227 assert!(config.cursor_batch_size.is_none());
228 }
229
230 #[test]
231 fn cursor_batch_size_deserializes_from_json() {
232 let json = r#"{
233 "connection_uri": "mongodb://localhost:27017",
234 "database": "db",
235 "collection": "c",
236 "cursor_batch_size": 100,
237 "batch_size": 500
238 }"#;
239 let config: MongoSourceConfig = serde_json::from_str(json).unwrap();
240 assert_eq!(config.cursor_batch_size, Some(100));
241 assert_eq!(config.batch_size, 500);
242 }
243}