faucet_source_mongodb/
stream.rs1use crate::config::MongoSourceConfig;
4use async_trait::async_trait;
5use faucet_core::{FaucetError, Stream, StreamPage};
6use mongodb::Client;
7use mongodb::bson::{self, Bson, Document};
8use mongodb::options::FindOptions;
9use serde_json::Value;
10use std::pin::Pin;
11
12pub struct MongoSource {
17 config: MongoSourceConfig,
18 client: Client,
19}
20
21impl MongoSource {
22 pub async fn new(config: MongoSourceConfig) -> Result<Self, FaucetError> {
27 faucet_core::validate_batch_size(config.batch_size)?;
28 let client = Client::with_uri_str(&config.connection_uri)
29 .await
30 .map_err(|e| FaucetError::Source(format!("MongoDB connection failed: {e}")))?;
31
32 Ok(Self { config, client })
33 }
34
35 pub async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
37 let db = self.client.database(&self.config.database);
38 let collection = db.collection::<Document>(&self.config.collection);
39
40 let filter = self
41 .config
42 .filter
43 .as_ref()
44 .map(json_value_to_document)
45 .transpose()?;
46
47 let mut find_options = FindOptions::default();
48
49 if let Some(ref proj) = self.config.projection {
50 find_options.projection = Some(json_value_to_document(proj)?);
51 }
52 if let Some(ref sort) = self.config.sort {
53 find_options.sort = Some(json_value_to_document(sort)?);
54 }
55 if let Some(limit) = self.config.limit {
56 find_options.limit = Some(limit);
57 }
58 if let Some(cursor_batch_size) = self.config.cursor_batch_size {
59 find_options.batch_size = Some(cursor_batch_size);
60 }
61
62 let mut cursor = collection
63 .find(filter.unwrap_or_default())
64 .with_options(find_options)
65 .await
66 .map_err(|e| FaucetError::Source(format!("MongoDB find failed: {e}")))?;
67
68 let mut records = Vec::new();
69
70 while cursor
71 .advance()
72 .await
73 .map_err(|e| FaucetError::Source(format!("MongoDB cursor advance failed: {e}")))?
74 {
75 let doc = cursor
76 .deserialize_current()
77 .map_err(|e| FaucetError::Source(format!("MongoDB deserialization failed: {e}")))?;
78
79 let value = bson_document_to_json_value(&doc)?;
80 records.push(value);
81 }
82
83 tracing::info!(
84 records = records.len(),
85 database = %self.config.database,
86 collection = %self.config.collection,
87 "MongoDB fetch complete"
88 );
89
90 Ok(records)
91 }
92}
93
94#[async_trait]
95impl faucet_core::Source for MongoSource {
96 async fn fetch_with_context(
97 &self,
98 context: &std::collections::HashMap<String, serde_json::Value>,
99 ) -> Result<Vec<Value>, FaucetError> {
100 if context.is_empty() {
101 return MongoSource::fetch_all(self).await;
102 }
103
104 let filter = substitute_optional_value(&self.config.filter, context, "filter")?;
106 let projection = substitute_optional_value(&self.config.projection, context, "projection")?;
107 let sort = substitute_optional_value(&self.config.sort, context, "sort")?;
108
109 let db = self.client.database(&self.config.database);
110 let collection = db.collection::<Document>(&self.config.collection);
111
112 let filter_doc = filter.as_ref().map(json_value_to_document).transpose()?;
113
114 let mut find_options = FindOptions::default();
115 if let Some(ref proj) = projection {
116 find_options.projection = Some(json_value_to_document(proj)?);
117 }
118 if let Some(ref s) = sort {
119 find_options.sort = Some(json_value_to_document(s)?);
120 }
121 if let Some(limit) = self.config.limit {
122 find_options.limit = Some(limit);
123 }
124 if let Some(cursor_batch_size) = self.config.cursor_batch_size {
125 find_options.batch_size = Some(cursor_batch_size);
126 }
127
128 let mut cursor = collection
129 .find(filter_doc.unwrap_or_default())
130 .with_options(find_options)
131 .await
132 .map_err(|e| FaucetError::Source(format!("MongoDB find failed: {e}")))?;
133
134 let mut records = Vec::new();
135 while cursor
136 .advance()
137 .await
138 .map_err(|e| FaucetError::Source(format!("MongoDB cursor advance failed: {e}")))?
139 {
140 let doc = cursor
141 .deserialize_current()
142 .map_err(|e| FaucetError::Source(format!("MongoDB deserialization failed: {e}")))?;
143 records.push(bson_document_to_json_value(&doc)?);
144 }
145
146 tracing::info!(
147 records = records.len(),
148 database = %self.config.database,
149 collection = %self.config.collection,
150 "MongoDB fetch complete (with context)"
151 );
152
153 Ok(records)
154 }
155
156 fn stream_pages<'a>(
174 &'a self,
175 context: &'a std::collections::HashMap<String, Value>,
176 _batch_size: usize,
177 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
178 let batch_size = self.config.batch_size;
179
180 Box::pin(async_stream::try_stream! {
181 let (filter, projection, sort) = if context.is_empty() {
184 (
185 self.config.filter.clone(),
186 self.config.projection.clone(),
187 self.config.sort.clone(),
188 )
189 } else {
190 (
191 substitute_optional_value(&self.config.filter, context, "filter")?,
192 substitute_optional_value(&self.config.projection, context, "projection")?,
193 substitute_optional_value(&self.config.sort, context, "sort")?,
194 )
195 };
196
197 let db = self.client.database(&self.config.database);
198 let collection = db.collection::<Document>(&self.config.collection);
199
200 let filter_doc = filter.as_ref().map(json_value_to_document).transpose()?;
201
202 let mut find_options = FindOptions::default();
203 if let Some(ref proj) = projection {
204 find_options.projection = Some(json_value_to_document(proj)?);
205 }
206 if let Some(ref s) = sort {
207 find_options.sort = Some(json_value_to_document(s)?);
208 }
209 if let Some(limit) = self.config.limit {
210 find_options.limit = Some(limit);
211 }
212 if let Some(cursor_batch_size) = self.config.cursor_batch_size {
213 find_options.batch_size = Some(cursor_batch_size);
214 }
215
216 let mut cursor = collection
217 .find(filter_doc.unwrap_or_default())
218 .with_options(find_options)
219 .await
220 .map_err(|e| FaucetError::Source(format!("MongoDB find failed: {e}")))?;
221
222 let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
223 let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
224 let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
225 let mut total = 0usize;
226
227 while cursor
228 .advance()
229 .await
230 .map_err(|e| FaucetError::Source(format!("MongoDB cursor advance failed: {e}")))?
231 {
232 let doc = cursor
233 .deserialize_current()
234 .map_err(|e| FaucetError::Source(format!("MongoDB deserialization failed: {e}")))?;
235 buffer.push(bson_document_to_json_value(&doc)?);
236 if buffer.len() >= chunk {
237 let page = std::mem::replace(&mut buffer, Vec::with_capacity(initial_capacity));
238 total += page.len();
239 yield StreamPage { records: page, bookmark: None };
240 }
241 }
242 if !buffer.is_empty() {
243 total += buffer.len();
244 yield StreamPage { records: buffer, bookmark: None };
245 }
246
247 tracing::info!(
248 records = total,
249 batch_size,
250 database = %self.config.database,
251 collection = %self.config.collection,
252 "MongoDB source stream complete",
253 );
254 })
255 }
256
257 fn config_schema(&self) -> serde_json::Value {
258 serde_json::to_value(faucet_core::schema_for!(MongoSourceConfig))
259 .expect("schema serialization")
260 }
261}
262
263fn substitute_optional_value(
269 value: &Option<Value>,
270 context: &std::collections::HashMap<String, Value>,
271 field_name: &str,
272) -> Result<Option<Value>, FaucetError> {
273 match value {
274 Some(v) => {
275 let s = serde_json::to_string(v).map_err(|e| {
276 FaucetError::Config(format!("failed to serialize {field_name}: {e}"))
277 })?;
278 let s = faucet_core::util::substitute_context_json(&s, context);
279 let resolved = serde_json::from_str(&s).map_err(|e| {
280 FaucetError::Config(format!("failed to parse substituted {field_name}: {e}"))
281 })?;
282 Ok(Some(resolved))
283 }
284 None => Ok(None),
285 }
286}
287
288fn json_value_to_document(val: &Value) -> Result<Document, FaucetError> {
292 let bson = bson::to_bson(val)
293 .map_err(|e| FaucetError::Config(format!("failed to convert JSON to BSON: {e}")))?;
294 match bson {
295 Bson::Document(doc) => Ok(doc),
296 other => Err(FaucetError::Config(format!(
297 "expected a JSON object, got BSON type: {other:?}"
298 ))),
299 }
300}
301
302fn bson_document_to_json_value(doc: &Document) -> Result<Value, FaucetError> {
304 let bson = Bson::Document(doc.clone());
305 let relaxed = bson.into_relaxed_extjson();
306 Ok(relaxed)
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312 use serde_json::json;
313
314 #[test]
315 fn json_object_to_document() {
316 let val = json!({"name": "Alice", "age": 30});
317 let doc = json_value_to_document(&val).unwrap();
318 assert_eq!(doc.get_str("name").unwrap(), "Alice");
319 assert_eq!(doc.get_i64("age").unwrap(), 30);
320 }
321
322 #[test]
323 fn json_non_object_to_document_fails() {
324 let val = json!([1, 2, 3]);
325 let result = json_value_to_document(&val);
326 assert!(result.is_err());
327 assert!(matches!(result, Err(FaucetError::Config(_))));
328 }
329
330 #[test]
331 fn json_string_to_document_fails() {
332 let val = json!("not an object");
333 let result = json_value_to_document(&val);
334 assert!(result.is_err());
335 }
336
337 #[test]
338 fn bson_document_roundtrip() {
339 let mut doc = Document::new();
340 doc.insert("name", "Bob");
341 doc.insert("score", 42);
342 let value = bson_document_to_json_value(&doc).unwrap();
343 assert_eq!(value["name"], "Bob");
344 assert_eq!(value["score"], 42);
345 }
346
347 #[test]
348 fn nested_document_conversion() {
349 let val = json!({"user": {"name": "Alice", "tags": ["admin", "user"]}});
350 let doc = json_value_to_document(&val).unwrap();
351 let inner = doc.get_document("user").unwrap();
352 assert_eq!(inner.get_str("name").unwrap(), "Alice");
353
354 let back = bson_document_to_json_value(&doc).unwrap();
355 assert_eq!(back["user"]["name"], "Alice");
356 assert_eq!(back["user"]["tags"][0], "admin");
357 }
358
359 #[test]
360 fn empty_filter_converts() {
361 let val = json!({});
362 let doc = json_value_to_document(&val).unwrap();
363 assert!(doc.is_empty());
364 }
365
366 #[tokio::test]
367 async fn new_rejects_out_of_range_batch_size() {
368 let mut config = MongoSourceConfig::new("mongodb://localhost:27017", "db", "c");
369 config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
370 match MongoSource::new(config).await {
371 Err(faucet_core::FaucetError::Config(m)) => {
372 assert!(m.contains("batch_size"), "got: {m}")
373 }
374 _ => panic!("expected a batch_size Config error"),
375 }
376 }
377}