faucet_sink_mongodb/
sink.rs1use crate::config::MongoSinkConfig;
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use mongodb::Client;
7use mongodb::bson::{self, Bson, Document};
8use serde_json::Value;
9
10pub struct MongoSink {
15 config: MongoSinkConfig,
16 client: Client,
17}
18
19impl MongoSink {
20 pub async fn new(config: MongoSinkConfig) -> Result<Self, FaucetError> {
22 faucet_core::validate_batch_size(config.batch_size)?;
23 let client = Client::with_uri_str(&config.connection_uri)
24 .await
25 .map_err(|e| FaucetError::Config(format!("MongoDB connection failed: {e}")))?;
26
27 Ok(Self { config, client })
28 }
29
30 fn value_to_document(val: &Value) -> Result<Document, FaucetError> {
34 let bson = bson::to_bson(val)
35 .map_err(|e| FaucetError::Sink(format!("failed to convert JSON to BSON: {e}")))?;
36 match bson {
37 Bson::Document(doc) => Ok(doc),
38 other => Err(FaucetError::Sink(format!(
39 "expected a JSON object, got BSON type: {other:?}"
40 ))),
41 }
42 }
43}
44
45#[async_trait]
46impl faucet_core::Sink for MongoSink {
47 fn config_schema(&self) -> serde_json::Value {
48 serde_json::to_value(faucet_core::schema_for!(MongoSinkConfig))
49 .expect("schema serialization")
50 }
51
52 async fn check(
55 &self,
56 ctx: &faucet_core::check::CheckContext,
57 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
58 use faucet_core::check::{CheckReport, Probe};
59
60 let started = std::time::Instant::now();
61 let hint = "check connection_uri / credentials / that the MongoDB server is reachable";
62
63 let db = self.client.database(&self.config.database);
64 let probe =
65 match tokio::time::timeout(ctx.timeout, db.run_command(bson::doc! {"ping": 1})).await {
66 Ok(Ok(_)) => Probe::pass("ping", started.elapsed()),
67 Ok(Err(e)) => Probe::fail_hint("ping", started.elapsed(), e.to_string(), hint),
68 Err(_) => Probe::fail_hint("ping", started.elapsed(), "timed out", hint),
69 };
70 Ok(CheckReport::single(probe))
71 }
72
73 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
82 if records.is_empty() {
83 return Ok(0);
84 }
85
86 let collection = self
87 .client
88 .database(&self.config.database)
89 .collection::<Document>(&self.config.collection);
90
91 let effective_chunk = if self.config.batch_size == 0 {
95 records.len()
96 } else {
97 self.config.batch_size
98 };
99
100 let mut total_written = 0usize;
101
102 for chunk in records.chunks(effective_chunk) {
103 let docs: Vec<Document> = chunk
104 .iter()
105 .map(Self::value_to_document)
106 .collect::<Result<Vec<_>, _>>()?;
107
108 let opts = mongodb::options::InsertManyOptions::builder()
109 .ordered(self.config.ordered)
110 .build();
111 collection
112 .insert_many(&docs)
113 .with_options(opts)
114 .await
115 .map_err(|e| FaucetError::Sink(format!("MongoDB insert_many failed: {e}")))?;
116
117 total_written += docs.len();
118 tracing::debug!(batch_size = docs.len(), "MongoDB batch inserted");
119 }
120
121 tracing::info!(
122 records = total_written,
123 database = %self.config.database,
124 collection = %self.config.collection,
125 "MongoDB write complete"
126 );
127
128 Ok(total_written)
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use serde_json::json;
136
137 #[test]
138 fn value_to_document_object() {
139 let val = json!({"name": "Alice", "age": 30});
140 let doc = MongoSink::value_to_document(&val).unwrap();
141 assert_eq!(doc.get_str("name").unwrap(), "Alice");
142 assert_eq!(doc.get_i64("age").unwrap(), 30);
143 }
144
145 #[test]
146 fn value_to_document_non_object_fails() {
147 let val = json!([1, 2, 3]);
148 let result = MongoSink::value_to_document(&val);
149 assert!(result.is_err());
150 assert!(matches!(result, Err(FaucetError::Sink(_))));
151 }
152
153 #[test]
154 fn value_to_document_string_fails() {
155 let val = json!("not an object");
156 let result = MongoSink::value_to_document(&val);
157 assert!(result.is_err());
158 }
159
160 #[test]
161 fn value_to_document_nested() {
162 let val = json!({"user": {"name": "Bob"}, "tags": ["a", "b"]});
163 let doc = MongoSink::value_to_document(&val).unwrap();
164 let inner = doc.get_document("user").unwrap();
165 assert_eq!(inner.get_str("name").unwrap(), "Bob");
166 }
167
168 #[test]
169 fn value_to_document_empty_object() {
170 let val = json!({});
171 let doc = MongoSink::value_to_document(&val).unwrap();
172 assert!(doc.is_empty());
173 }
174
175 #[test]
176 fn value_to_document_null_fails() {
177 let val = Value::Null;
178 let result = MongoSink::value_to_document(&val);
179 assert!(result.is_err());
180 }
181
182 #[tokio::test]
183 async fn new_rejects_out_of_range_batch_size() {
184 let mut config = MongoSinkConfig::new("mongodb://localhost:27017", "db", "c");
185 config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
186 match MongoSink::new(config).await {
187 Err(faucet_core::FaucetError::Config(m)) => {
188 assert!(m.contains("batch_size"), "got: {m}")
189 }
190 _ => panic!("expected a batch_size Config error"),
191 }
192 }
193}