Skip to main content

faucet_source_mongodb/
config.rs

1//! MongoDB source configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::fmt;
8
9/// Configuration for the MongoDB source connector.
10///
11/// # Example
12///
13/// ```
14/// use faucet_source_mongodb::MongoSourceConfig;
15/// use serde_json::json;
16///
17/// let config = MongoSourceConfig::new(
18///     "mongodb://localhost:27017",
19///     "my_database",
20///     "my_collection",
21/// )
22/// .filter(json!({"status": "active"}))
23/// .projection(json!({"_id": 0, "name": 1, "email": 1}))
24/// .sort(json!({"created_at": -1}))
25/// .limit(1000)
26/// .cursor_batch_size(200)
27/// .with_batch_size(500);
28/// ```
29#[derive(Clone, Serialize, Deserialize, JsonSchema)]
30pub struct MongoSourceConfig {
31    /// MongoDB connection URI (e.g. `mongodb://localhost:27017`).
32    pub connection_uri: String,
33    /// Database name.
34    pub database: String,
35    /// Collection name.
36    pub collection: String,
37    /// Optional query filter as JSON, converted to a BSON `Document` at query time.
38    pub filter: Option<Value>,
39    /// Optional field projection as JSON.
40    pub projection: Option<Value>,
41    /// Optional sort specification as JSON.
42    pub sort: Option<Value>,
43    /// Maximum number of documents to return.
44    pub limit: Option<i64>,
45    /// Cursor batch size: documents per server round-trip on the MongoDB
46    /// driver cursor. This is a wire-level tuning knob — distinct from
47    /// [`Self::batch_size`], which controls the size of each emitted
48    /// [`StreamPage`](faucet_core::StreamPage).
49    pub cursor_batch_size: Option<u32>,
50    /// Records per emitted [`StreamPage`](faucet_core::StreamPage). Documents
51    /// are pulled from the cursor and yielded whenever the buffer reaches
52    /// this size. Defaults to [`DEFAULT_BATCH_SIZE`].
53    ///
54    /// `batch_size = 0` is the "no batching" sentinel: the cursor is fully
55    /// drained and the entire result set is emitted in a single page. Useful
56    /// for small lookup tables or for sinks (e.g. SQL `COPY`, BigQuery load
57    /// jobs) that prefer one large request to many small ones.
58    #[serde(default = "default_batch_size")]
59    pub batch_size: usize,
60}
61
62fn default_batch_size() -> usize {
63    DEFAULT_BATCH_SIZE
64}
65
66impl MongoSourceConfig {
67    /// Create a new config with the required connection URI, database, and collection.
68    pub fn new(
69        connection_uri: impl Into<String>,
70        database: impl Into<String>,
71        collection: impl Into<String>,
72    ) -> Self {
73        Self {
74            connection_uri: connection_uri.into(),
75            database: database.into(),
76            collection: collection.into(),
77            filter: None,
78            projection: None,
79            sort: None,
80            limit: None,
81            cursor_batch_size: None,
82            batch_size: DEFAULT_BATCH_SIZE,
83        }
84    }
85
86    /// Set the query filter (JSON object converted to BSON at query time).
87    pub fn filter(mut self, filter: Value) -> Self {
88        self.filter = Some(filter);
89        self
90    }
91
92    /// Set the field projection.
93    pub fn projection(mut self, projection: Value) -> Self {
94        self.projection = Some(projection);
95        self
96    }
97
98    /// Set the sort specification.
99    pub fn sort(mut self, sort: Value) -> Self {
100        self.sort = Some(sort);
101        self
102    }
103
104    /// Set the maximum number of documents to return.
105    pub fn limit(mut self, limit: i64) -> Self {
106        self.limit = Some(limit);
107        self
108    }
109
110    /// Set the MongoDB driver cursor batch size (documents per server
111    /// round-trip). This is independent of [`Self::with_batch_size`], which
112    /// controls the size of each emitted `StreamPage`.
113    pub fn cursor_batch_size(mut self, cursor_batch_size: u32) -> Self {
114        self.cursor_batch_size = Some(cursor_batch_size);
115        self
116    }
117
118    /// Set the per-page document count for [`Source::stream_pages`](faucet_core::Source::stream_pages).
119    ///
120    /// Pass `0` to opt out of batching — the entire result set is emitted in
121    /// a single [`StreamPage`](faucet_core::StreamPage).
122    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
123        self.batch_size = batch_size;
124        self
125    }
126}
127
128impl fmt::Debug for MongoSourceConfig {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        f.debug_struct("MongoSourceConfig")
131            .field("connection_uri", &"***")
132            .field("database", &self.database)
133            .field("collection", &self.collection)
134            .field("filter", &self.filter)
135            .field("projection", &self.projection)
136            .field("sort", &self.sort)
137            .field("limit", &self.limit)
138            .field("cursor_batch_size", &self.cursor_batch_size)
139            .field("batch_size", &self.batch_size)
140            .finish()
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use serde_json::json;
148
149    #[test]
150    fn default_config() {
151        let config = MongoSourceConfig::new("mongodb://localhost:27017", "testdb", "users");
152        assert_eq!(config.database, "testdb");
153        assert_eq!(config.collection, "users");
154        assert!(config.filter.is_none());
155        assert!(config.projection.is_none());
156        assert!(config.sort.is_none());
157        assert!(config.limit.is_none());
158        assert!(config.cursor_batch_size.is_none());
159        assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
160    }
161
162    #[test]
163    fn builder_methods() {
164        let config = MongoSourceConfig::new("mongodb://localhost:27017", "testdb", "users")
165            .filter(json!({"active": true}))
166            .projection(json!({"_id": 0, "name": 1}))
167            .sort(json!({"name": 1}))
168            .limit(500)
169            .cursor_batch_size(100)
170            .with_batch_size(2000);
171
172        assert_eq!(config.filter.unwrap(), json!({"active": true}));
173        assert_eq!(config.projection.unwrap(), json!({"_id": 0, "name": 1}));
174        assert_eq!(config.sort.unwrap(), json!({"name": 1}));
175        assert_eq!(config.limit, Some(500));
176        assert_eq!(config.cursor_batch_size, Some(100));
177        assert_eq!(config.batch_size, 2000);
178    }
179
180    #[test]
181    fn debug_masks_connection_uri() {
182        let config =
183            MongoSourceConfig::new("mongodb://user:secret@host:27017/db", "testdb", "users");
184        let debug = format!("{config:?}");
185        assert!(debug.contains("***"));
186        assert!(!debug.contains("secret"));
187    }
188
189    #[test]
190    fn batch_size_defaults_to_default_batch_size() {
191        let config = MongoSourceConfig::new("mongodb://localhost:27017", "db", "c");
192        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
193    }
194
195    #[test]
196    fn with_batch_size_overrides_default() {
197        let config =
198            MongoSourceConfig::new("mongodb://localhost:27017", "db", "c").with_batch_size(500);
199        assert_eq!(config.batch_size, 500);
200    }
201
202    #[test]
203    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
204        let config =
205            MongoSourceConfig::new("mongodb://localhost:27017", "db", "c").with_batch_size(0);
206        assert_eq!(config.batch_size, 0);
207        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
208    }
209
210    #[test]
211    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
212        let config = MongoSourceConfig::new("mongodb://localhost:27017", "db", "c")
213            .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
214        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
215    }
216
217    #[test]
218    fn batch_size_deserializes_from_json() {
219        let json = r#"{
220            "connection_uri": "mongodb://localhost:27017",
221            "database": "db",
222            "collection": "c",
223            "batch_size": 250
224        }"#;
225        let config: MongoSourceConfig = serde_json::from_str(json).unwrap();
226        assert_eq!(config.batch_size, 250);
227        assert!(config.cursor_batch_size.is_none());
228    }
229
230    #[test]
231    fn cursor_batch_size_deserializes_from_json() {
232        let json = r#"{
233            "connection_uri": "mongodb://localhost:27017",
234            "database": "db",
235            "collection": "c",
236            "cursor_batch_size": 100,
237            "batch_size": 500
238        }"#;
239        let config: MongoSourceConfig = serde_json::from_str(json).unwrap();
240        assert_eq!(config.cursor_batch_size, Some(100));
241        assert_eq!(config.batch_size, 500);
242    }
243}