Skip to main content

faucet_sink_mongodb/
config.rs

1//! MongoDB sink configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use std::fmt;
7
8/// Configuration for the MongoDB sink connector.
9///
10/// # Example
11///
12/// ```
13/// use faucet_sink_mongodb::MongoSinkConfig;
14///
15/// let config = MongoSinkConfig::new(
16///     "mongodb://localhost:27017",
17///     "my_database",
18///     "my_collection",
19/// )
20/// .with_batch_size(1000);
21/// ```
22#[derive(Clone, Serialize, Deserialize, JsonSchema)]
23pub struct MongoSinkConfig {
24    /// MongoDB connection URI (e.g. `mongodb://localhost:27017`).
25    pub connection_uri: String,
26    /// Database name.
27    pub database: String,
28    /// Collection name.
29    pub collection: String,
30    /// Maximum number of documents per `insert_many` call. Defaults to
31    /// [`DEFAULT_BATCH_SIZE`] (1000), which is a good balance for MongoDB's
32    /// per-request limits and round-trip cost.
33    ///
34    /// When `write_batch` is handed a slice larger than `batch_size`, the
35    /// sink re-chunks it into `batch_size` slices and issues one
36    /// `insert_many` per chunk. `batch_size = 0` is the **"no batching"
37    /// sentinel** — the records slice is forwarded as a single
38    /// `insert_many`, no matter how large, so upstream `StreamPage` framing
39    /// flows through untouched.
40    #[serde(default = "default_batch_size")]
41    pub batch_size: usize,
42    /// Whether `insert_many` is **ordered**. Default `false` (unordered).
43    ///
44    /// With the MongoDB default of `ordered = true`, the first failing
45    /// document (a duplicate `_id`, a validation error, …) aborts the rest of
46    /// the batch — the documents before it commit, those after are silently
47    /// dropped. Unordered (`false`) instead attempts every document and only
48    /// the genuinely-bad ones fail, so a single poison record can't drop the
49    /// rest of the batch (#78/#20).
50    #[serde(default)]
51    pub ordered: bool,
52}
53
54fn default_batch_size() -> usize {
55    DEFAULT_BATCH_SIZE
56}
57
58impl MongoSinkConfig {
59    /// Create a new config with the required connection URI, database, and collection.
60    pub fn new(
61        connection_uri: impl Into<String>,
62        database: impl Into<String>,
63        collection: impl Into<String>,
64    ) -> Self {
65        Self {
66            connection_uri: connection_uri.into(),
67            database: database.into(),
68            collection: collection.into(),
69            batch_size: DEFAULT_BATCH_SIZE,
70            ordered: false,
71        }
72    }
73
74    /// Set whether `insert_many` is ordered (default `false`).
75    pub fn with_ordered(mut self, ordered: bool) -> Self {
76        self.ordered = ordered;
77        self
78    }
79
80    /// Set the maximum number of documents per `insert_many` call.
81    ///
82    /// Pass `0` to opt out of re-chunking — the entire records slice handed
83    /// to `write_batch` is sent in a single `insert_many` call, preserving
84    /// upstream `StreamPage` framing.
85    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
86        self.batch_size = batch_size;
87        self
88    }
89}
90
91impl fmt::Debug for MongoSinkConfig {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        f.debug_struct("MongoSinkConfig")
94            .field("connection_uri", &"***")
95            .field("database", &self.database)
96            .field("collection", &self.collection)
97            .field("batch_size", &self.batch_size)
98            .field("ordered", &self.ordered)
99            .finish()
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106
107    #[test]
108    fn default_config() {
109        let config = MongoSinkConfig::new("mongodb://localhost:27017", "testdb", "users");
110        assert_eq!(config.database, "testdb");
111        assert_eq!(config.collection, "users");
112        assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
113    }
114
115    #[test]
116    fn batch_size_defaults_to_default_batch_size() {
117        let config = MongoSinkConfig::new("mongodb://localhost:27017", "testdb", "users");
118        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
119    }
120
121    #[test]
122    fn with_batch_size_overrides_default() {
123        let config = MongoSinkConfig::new("mongodb://localhost:27017", "testdb", "users")
124            .with_batch_size(2000);
125        assert_eq!(config.batch_size, 2000);
126    }
127
128    #[test]
129    fn debug_masks_connection_uri() {
130        let config = MongoSinkConfig::new("mongodb://user:secret@host:27017/db", "testdb", "users");
131        let debug = format!("{config:?}");
132        assert!(debug.contains("***"));
133        assert!(!debug.contains("secret"));
134    }
135
136    #[test]
137    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
138        let config =
139            MongoSinkConfig::new("mongodb://localhost:27017", "db", "c").with_batch_size(0);
140        assert_eq!(config.batch_size, 0);
141        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
142    }
143
144    #[test]
145    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
146        let config = MongoSinkConfig::new("mongodb://localhost:27017", "db", "c")
147            .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
148        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
149    }
150
151    #[test]
152    fn batch_size_deserializes_from_json() {
153        let json = r#"{
154            "connection_uri": "mongodb://localhost:27017",
155            "database": "db",
156            "collection": "c",
157            "batch_size": 250
158        }"#;
159        let config: MongoSinkConfig = serde_json::from_str(json).unwrap();
160        assert_eq!(config.batch_size, 250);
161    }
162
163    #[test]
164    fn batch_size_defaults_when_absent_in_json() {
165        let json = r#"{
166            "connection_uri": "mongodb://localhost:27017",
167            "database": "db",
168            "collection": "c"
169        }"#;
170        let config: MongoSinkConfig = serde_json::from_str(json).unwrap();
171        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
172    }
173}