Skip to main content

faucet_source_mongodb/
stream.rs

1//! MongoDB stream executor.
2
3use crate::config::MongoSourceConfig;
4use async_trait::async_trait;
5use faucet_core::{FaucetError, Stream, StreamPage};
6use mongodb::Client;
7use mongodb::bson::{self, Bson, Document};
8use mongodb::options::FindOptions;
9use serde_json::Value;
10use std::pin::Pin;
11
12/// A configured MongoDB source that connects to a collection and fetches documents.
13///
14/// The MongoDB `Client` is created once during construction and reused across
15/// all `fetch_all()` calls. It maintains an internal connection pool.
16pub struct MongoSource {
17    config: MongoSourceConfig,
18    client: Client,
19}
20
21impl MongoSource {
22    /// Create a new MongoDB source from the given configuration.
23    ///
24    /// This establishes the MongoDB client (with its internal connection pool)
25    /// immediately.
26    pub async fn new(config: MongoSourceConfig) -> Result<Self, FaucetError> {
27        faucet_core::validate_batch_size(config.batch_size)?;
28        let client = Client::with_uri_str(&config.connection_uri)
29            .await
30            .map_err(|e| FaucetError::Source(format!("MongoDB connection failed: {e}")))?;
31
32        Ok(Self { config, client })
33    }
34
35    /// Fetch all matching documents from the configured collection.
36    pub async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
37        let db = self.client.database(&self.config.database);
38        let collection = db.collection::<Document>(&self.config.collection);
39
40        let filter = self
41            .config
42            .filter
43            .as_ref()
44            .map(json_value_to_document)
45            .transpose()?;
46
47        let mut find_options = FindOptions::default();
48
49        if let Some(ref proj) = self.config.projection {
50            find_options.projection = Some(json_value_to_document(proj)?);
51        }
52        if let Some(ref sort) = self.config.sort {
53            find_options.sort = Some(json_value_to_document(sort)?);
54        }
55        if let Some(limit) = self.config.limit {
56            find_options.limit = Some(limit);
57        }
58        if let Some(cursor_batch_size) = self.config.cursor_batch_size {
59            find_options.batch_size = Some(cursor_batch_size);
60        }
61
62        let mut cursor = collection
63            .find(filter.unwrap_or_default())
64            .with_options(find_options)
65            .await
66            .map_err(|e| FaucetError::Source(format!("MongoDB find failed: {e}")))?;
67
68        let mut records = Vec::new();
69
70        while cursor
71            .advance()
72            .await
73            .map_err(|e| FaucetError::Source(format!("MongoDB cursor advance failed: {e}")))?
74        {
75            let doc = cursor
76                .deserialize_current()
77                .map_err(|e| FaucetError::Source(format!("MongoDB deserialization failed: {e}")))?;
78
79            let value = bson_document_to_json_value(&doc)?;
80            records.push(value);
81        }
82
83        tracing::info!(
84            records = records.len(),
85            database = %self.config.database,
86            collection = %self.config.collection,
87            "MongoDB fetch complete"
88        );
89
90        Ok(records)
91    }
92}
93
94#[async_trait]
95impl faucet_core::Source for MongoSource {
96    async fn fetch_with_context(
97        &self,
98        context: &std::collections::HashMap<String, serde_json::Value>,
99    ) -> Result<Vec<Value>, FaucetError> {
100        if context.is_empty() {
101            return MongoSource::fetch_all(self).await;
102        }
103
104        // Substitute context placeholders into filter, projection, and sort.
105        let filter = substitute_optional_value(&self.config.filter, context, "filter")?;
106        let projection = substitute_optional_value(&self.config.projection, context, "projection")?;
107        let sort = substitute_optional_value(&self.config.sort, context, "sort")?;
108
109        let db = self.client.database(&self.config.database);
110        let collection = db.collection::<Document>(&self.config.collection);
111
112        let filter_doc = filter.as_ref().map(json_value_to_document).transpose()?;
113
114        let mut find_options = FindOptions::default();
115        if let Some(ref proj) = projection {
116            find_options.projection = Some(json_value_to_document(proj)?);
117        }
118        if let Some(ref s) = sort {
119            find_options.sort = Some(json_value_to_document(s)?);
120        }
121        if let Some(limit) = self.config.limit {
122            find_options.limit = Some(limit);
123        }
124        if let Some(cursor_batch_size) = self.config.cursor_batch_size {
125            find_options.batch_size = Some(cursor_batch_size);
126        }
127
128        let mut cursor = collection
129            .find(filter_doc.unwrap_or_default())
130            .with_options(find_options)
131            .await
132            .map_err(|e| FaucetError::Source(format!("MongoDB find failed: {e}")))?;
133
134        let mut records = Vec::new();
135        while cursor
136            .advance()
137            .await
138            .map_err(|e| FaucetError::Source(format!("MongoDB cursor advance failed: {e}")))?
139        {
140            let doc = cursor
141                .deserialize_current()
142                .map_err(|e| FaucetError::Source(format!("MongoDB deserialization failed: {e}")))?;
143            records.push(bson_document_to_json_value(&doc)?);
144        }
145
146        tracing::info!(
147            records = records.len(),
148            database = %self.config.database,
149            collection = %self.config.collection,
150            "MongoDB fetch complete (with context)"
151        );
152
153        Ok(records)
154    }
155
156    /// Stream documents from the underlying MongoDB cursor without buffering
157    /// the full result set. Each emitted [`StreamPage`] holds up to
158    /// [`MongoSourceConfig::batch_size`] documents.
159    ///
160    /// The trait-level `batch_size` argument is ignored in favour of the
161    /// config field — the config is the user-facing knob the README
162    /// documents, and routing the pipeline-supplied hint through it would
163    /// silently override an explicit config value.
164    ///
165    /// `batch_size = 0` drains the entire cursor into a single page. The
166    /// MongoDB source has no incremental-replication mode today, so every
167    /// emitted page carries `bookmark: None`.
168    ///
169    /// Note: [`MongoSourceConfig::cursor_batch_size`] is independent — it
170    /// controls the driver's per-round-trip batch size, while `batch_size`
171    /// controls how many documents are buffered before a `StreamPage` is
172    /// yielded to the pipeline.
173    fn stream_pages<'a>(
174        &'a self,
175        context: &'a std::collections::HashMap<String, Value>,
176        _batch_size: usize,
177    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
178        let batch_size = self.config.batch_size;
179
180        Box::pin(async_stream::try_stream! {
181            // Substitute context placeholders into filter, projection, sort
182            // (matching fetch_with_context's behaviour).
183            let (filter, projection, sort) = if context.is_empty() {
184                (
185                    self.config.filter.clone(),
186                    self.config.projection.clone(),
187                    self.config.sort.clone(),
188                )
189            } else {
190                (
191                    substitute_optional_value(&self.config.filter, context, "filter")?,
192                    substitute_optional_value(&self.config.projection, context, "projection")?,
193                    substitute_optional_value(&self.config.sort, context, "sort")?,
194                )
195            };
196
197            let db = self.client.database(&self.config.database);
198            let collection = db.collection::<Document>(&self.config.collection);
199
200            let filter_doc = filter.as_ref().map(json_value_to_document).transpose()?;
201
202            let mut find_options = FindOptions::default();
203            if let Some(ref proj) = projection {
204                find_options.projection = Some(json_value_to_document(proj)?);
205            }
206            if let Some(ref s) = sort {
207                find_options.sort = Some(json_value_to_document(s)?);
208            }
209            if let Some(limit) = self.config.limit {
210                find_options.limit = Some(limit);
211            }
212            if let Some(cursor_batch_size) = self.config.cursor_batch_size {
213                find_options.batch_size = Some(cursor_batch_size);
214            }
215
216            let mut cursor = collection
217                .find(filter_doc.unwrap_or_default())
218                .with_options(find_options)
219                .await
220                .map_err(|e| FaucetError::Source(format!("MongoDB find failed: {e}")))?;
221
222            let chunk = if batch_size == 0 { usize::MAX } else { batch_size };
223            let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
224            let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
225            let mut total = 0usize;
226
227            while cursor
228                .advance()
229                .await
230                .map_err(|e| FaucetError::Source(format!("MongoDB cursor advance failed: {e}")))?
231            {
232                let doc = cursor
233                    .deserialize_current()
234                    .map_err(|e| FaucetError::Source(format!("MongoDB deserialization failed: {e}")))?;
235                buffer.push(bson_document_to_json_value(&doc)?);
236                if buffer.len() >= chunk {
237                    let page = std::mem::replace(&mut buffer, Vec::with_capacity(initial_capacity));
238                    total += page.len();
239                    yield StreamPage { records: page, bookmark: None };
240                }
241            }
242            if !buffer.is_empty() {
243                total += buffer.len();
244                yield StreamPage { records: buffer, bookmark: None };
245            }
246
247            tracing::info!(
248                records = total,
249                batch_size,
250                database = %self.config.database,
251                collection = %self.config.collection,
252                "MongoDB source stream complete",
253            );
254        })
255    }
256
257    fn config_schema(&self) -> serde_json::Value {
258        serde_json::to_value(faucet_core::schema_for!(MongoSourceConfig))
259            .expect("schema serialization")
260    }
261}
262
263/// Substitute context placeholders in an optional JSON value.
264///
265/// Serialises the value to a string, runs [`substitute_context_json`] (which
266/// properly escapes string values for JSON safety), then deserialises back.
267/// Returns `None` when the input is `None`.
268fn substitute_optional_value(
269    value: &Option<Value>,
270    context: &std::collections::HashMap<String, Value>,
271    field_name: &str,
272) -> Result<Option<Value>, FaucetError> {
273    match value {
274        Some(v) => {
275            let s = serde_json::to_string(v).map_err(|e| {
276                FaucetError::Config(format!("failed to serialize {field_name}: {e}"))
277            })?;
278            let s = faucet_core::util::substitute_context_json(&s, context);
279            let resolved = serde_json::from_str(&s).map_err(|e| {
280                FaucetError::Config(format!("failed to parse substituted {field_name}: {e}"))
281            })?;
282            Ok(Some(resolved))
283        }
284        None => Ok(None),
285    }
286}
287
288/// Convert a `serde_json::Value` to a `bson::Document`.
289///
290/// The value must be a JSON object; other types produce a `Config` error.
291fn json_value_to_document(val: &Value) -> Result<Document, FaucetError> {
292    let bson = bson::to_bson(val)
293        .map_err(|e| FaucetError::Config(format!("failed to convert JSON to BSON: {e}")))?;
294    match bson {
295        Bson::Document(doc) => Ok(doc),
296        other => Err(FaucetError::Config(format!(
297            "expected a JSON object, got BSON type: {other:?}"
298        ))),
299    }
300}
301
302/// Convert a `bson::Document` to a `serde_json::Value`.
303fn bson_document_to_json_value(doc: &Document) -> Result<Value, FaucetError> {
304    let bson = Bson::Document(doc.clone());
305    let relaxed = bson.into_relaxed_extjson();
306    Ok(relaxed)
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312    use serde_json::json;
313
314    #[test]
315    fn json_object_to_document() {
316        let val = json!({"name": "Alice", "age": 30});
317        let doc = json_value_to_document(&val).unwrap();
318        assert_eq!(doc.get_str("name").unwrap(), "Alice");
319        assert_eq!(doc.get_i64("age").unwrap(), 30);
320    }
321
322    #[test]
323    fn json_non_object_to_document_fails() {
324        let val = json!([1, 2, 3]);
325        let result = json_value_to_document(&val);
326        assert!(result.is_err());
327        assert!(matches!(result, Err(FaucetError::Config(_))));
328    }
329
330    #[test]
331    fn json_string_to_document_fails() {
332        let val = json!("not an object");
333        let result = json_value_to_document(&val);
334        assert!(result.is_err());
335    }
336
337    #[test]
338    fn bson_document_roundtrip() {
339        let mut doc = Document::new();
340        doc.insert("name", "Bob");
341        doc.insert("score", 42);
342        let value = bson_document_to_json_value(&doc).unwrap();
343        assert_eq!(value["name"], "Bob");
344        assert_eq!(value["score"], 42);
345    }
346
347    #[test]
348    fn nested_document_conversion() {
349        let val = json!({"user": {"name": "Alice", "tags": ["admin", "user"]}});
350        let doc = json_value_to_document(&val).unwrap();
351        let inner = doc.get_document("user").unwrap();
352        assert_eq!(inner.get_str("name").unwrap(), "Alice");
353
354        let back = bson_document_to_json_value(&doc).unwrap();
355        assert_eq!(back["user"]["name"], "Alice");
356        assert_eq!(back["user"]["tags"][0], "admin");
357    }
358
359    #[test]
360    fn empty_filter_converts() {
361        let val = json!({});
362        let doc = json_value_to_document(&val).unwrap();
363        assert!(doc.is_empty());
364    }
365
366    #[tokio::test]
367    async fn new_rejects_out_of_range_batch_size() {
368        let mut config = MongoSourceConfig::new("mongodb://localhost:27017", "db", "c");
369        config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
370        match MongoSource::new(config).await {
371            Err(faucet_core::FaucetError::Config(m)) => {
372                assert!(m.contains("batch_size"), "got: {m}")
373            }
374            _ => panic!("expected a batch_size Config error"),
375        }
376    }
377}