faucet_sink_mongodb/
config.rs1use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use std::fmt;
7
8#[derive(Clone, Serialize, Deserialize, JsonSchema)]
23pub struct MongoSinkConfig {
24 pub connection_uri: String,
26 pub database: String,
28 pub collection: String,
30 #[serde(default = "default_batch_size")]
41 pub batch_size: usize,
42 #[serde(default)]
51 pub ordered: bool,
52}
53
54fn default_batch_size() -> usize {
55 DEFAULT_BATCH_SIZE
56}
57
58impl MongoSinkConfig {
59 pub fn new(
61 connection_uri: impl Into<String>,
62 database: impl Into<String>,
63 collection: impl Into<String>,
64 ) -> Self {
65 Self {
66 connection_uri: connection_uri.into(),
67 database: database.into(),
68 collection: collection.into(),
69 batch_size: DEFAULT_BATCH_SIZE,
70 ordered: false,
71 }
72 }
73
74 pub fn with_ordered(mut self, ordered: bool) -> Self {
76 self.ordered = ordered;
77 self
78 }
79
80 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
86 self.batch_size = batch_size;
87 self
88 }
89}
90
91impl fmt::Debug for MongoSinkConfig {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 f.debug_struct("MongoSinkConfig")
94 .field("connection_uri", &"***")
95 .field("database", &self.database)
96 .field("collection", &self.collection)
97 .field("batch_size", &self.batch_size)
98 .field("ordered", &self.ordered)
99 .finish()
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106
107 #[test]
108 fn default_config() {
109 let config = MongoSinkConfig::new("mongodb://localhost:27017", "testdb", "users");
110 assert_eq!(config.database, "testdb");
111 assert_eq!(config.collection, "users");
112 assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
113 }
114
115 #[test]
116 fn batch_size_defaults_to_default_batch_size() {
117 let config = MongoSinkConfig::new("mongodb://localhost:27017", "testdb", "users");
118 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
119 }
120
121 #[test]
122 fn with_batch_size_overrides_default() {
123 let config = MongoSinkConfig::new("mongodb://localhost:27017", "testdb", "users")
124 .with_batch_size(2000);
125 assert_eq!(config.batch_size, 2000);
126 }
127
128 #[test]
129 fn debug_masks_connection_uri() {
130 let config = MongoSinkConfig::new("mongodb://user:secret@host:27017/db", "testdb", "users");
131 let debug = format!("{config:?}");
132 assert!(debug.contains("***"));
133 assert!(!debug.contains("secret"));
134 }
135
136 #[test]
137 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
138 let config =
139 MongoSinkConfig::new("mongodb://localhost:27017", "db", "c").with_batch_size(0);
140 assert_eq!(config.batch_size, 0);
141 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
142 }
143
144 #[test]
145 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
146 let config = MongoSinkConfig::new("mongodb://localhost:27017", "db", "c")
147 .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
148 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
149 }
150
151 #[test]
152 fn batch_size_deserializes_from_json() {
153 let json = r#"{
154 "connection_uri": "mongodb://localhost:27017",
155 "database": "db",
156 "collection": "c",
157 "batch_size": 250
158 }"#;
159 let config: MongoSinkConfig = serde_json::from_str(json).unwrap();
160 assert_eq!(config.batch_size, 250);
161 }
162
163 #[test]
164 fn batch_size_defaults_when_absent_in_json() {
165 let json = r#"{
166 "connection_uri": "mongodb://localhost:27017",
167 "database": "db",
168 "collection": "c"
169 }"#;
170 let config: MongoSinkConfig = serde_json::from_str(json).unwrap();
171 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
172 }
173}