use faucet_core::DEFAULT_BATCH_SIZE;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt;
#[derive(Clone, Serialize, Deserialize, JsonSchema)]
pub struct MongoSourceConfig {
pub connection_uri: String,
pub database: String,
pub collection: String,
pub filter: Option<Value>,
pub projection: Option<Value>,
pub sort: Option<Value>,
pub limit: Option<i64>,
pub cursor_batch_size: Option<u32>,
#[serde(default = "default_batch_size")]
pub batch_size: usize,
}
fn default_batch_size() -> usize {
DEFAULT_BATCH_SIZE
}
impl MongoSourceConfig {
pub fn new(
connection_uri: impl Into<String>,
database: impl Into<String>,
collection: impl Into<String>,
) -> Self {
Self {
connection_uri: connection_uri.into(),
database: database.into(),
collection: collection.into(),
filter: None,
projection: None,
sort: None,
limit: None,
cursor_batch_size: None,
batch_size: DEFAULT_BATCH_SIZE,
}
}
pub fn filter(mut self, filter: Value) -> Self {
self.filter = Some(filter);
self
}
pub fn projection(mut self, projection: Value) -> Self {
self.projection = Some(projection);
self
}
pub fn sort(mut self, sort: Value) -> Self {
self.sort = Some(sort);
self
}
pub fn limit(mut self, limit: i64) -> Self {
self.limit = Some(limit);
self
}
pub fn cursor_batch_size(mut self, cursor_batch_size: u32) -> Self {
self.cursor_batch_size = Some(cursor_batch_size);
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
}
impl fmt::Debug for MongoSourceConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MongoSourceConfig")
.field("connection_uri", &"***")
.field("database", &self.database)
.field("collection", &self.collection)
.field("filter", &self.filter)
.field("projection", &self.projection)
.field("sort", &self.sort)
.field("limit", &self.limit)
.field("cursor_batch_size", &self.cursor_batch_size)
.field("batch_size", &self.batch_size)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn default_config() {
let config = MongoSourceConfig::new("mongodb://localhost:27017", "testdb", "users");
assert_eq!(config.database, "testdb");
assert_eq!(config.collection, "users");
assert!(config.filter.is_none());
assert!(config.projection.is_none());
assert!(config.sort.is_none());
assert!(config.limit.is_none());
assert!(config.cursor_batch_size.is_none());
assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
}
#[test]
fn builder_methods() {
let config = MongoSourceConfig::new("mongodb://localhost:27017", "testdb", "users")
.filter(json!({"active": true}))
.projection(json!({"_id": 0, "name": 1}))
.sort(json!({"name": 1}))
.limit(500)
.cursor_batch_size(100)
.with_batch_size(2000);
assert_eq!(config.filter.unwrap(), json!({"active": true}));
assert_eq!(config.projection.unwrap(), json!({"_id": 0, "name": 1}));
assert_eq!(config.sort.unwrap(), json!({"name": 1}));
assert_eq!(config.limit, Some(500));
assert_eq!(config.cursor_batch_size, Some(100));
assert_eq!(config.batch_size, 2000);
}
#[test]
fn debug_masks_connection_uri() {
let config =
MongoSourceConfig::new("mongodb://user:secret@host:27017/db", "testdb", "users");
let debug = format!("{config:?}");
assert!(debug.contains("***"));
assert!(!debug.contains("secret"));
}
#[test]
fn batch_size_defaults_to_default_batch_size() {
let config = MongoSourceConfig::new("mongodb://localhost:27017", "db", "c");
assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
}
#[test]
fn with_batch_size_overrides_default() {
let config =
MongoSourceConfig::new("mongodb://localhost:27017", "db", "c").with_batch_size(500);
assert_eq!(config.batch_size, 500);
}
#[test]
fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
let config =
MongoSourceConfig::new("mongodb://localhost:27017", "db", "c").with_batch_size(0);
assert_eq!(config.batch_size, 0);
assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
}
#[test]
fn batch_size_above_max_is_rejected_by_validate_batch_size() {
let config = MongoSourceConfig::new("mongodb://localhost:27017", "db", "c")
.with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
}
#[test]
fn batch_size_deserializes_from_json() {
let json = r#"{
"connection_uri": "mongodb://localhost:27017",
"database": "db",
"collection": "c",
"batch_size": 250
}"#;
let config: MongoSourceConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.batch_size, 250);
assert!(config.cursor_batch_size.is_none());
}
#[test]
fn cursor_batch_size_deserializes_from_json() {
let json = r#"{
"connection_uri": "mongodb://localhost:27017",
"database": "db",
"collection": "c",
"cursor_batch_size": 100,
"batch_size": 500
}"#;
let config: MongoSourceConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.cursor_batch_size, Some(100));
assert_eq!(config.batch_size, 500);
}
}