Skip to main content

faucet_sink_mongodb/
sink.rs

1//! MongoDB sink implementation.
2
3use crate::config::MongoSinkConfig;
4use async_trait::async_trait;
5use faucet_core::FaucetError;
6use mongodb::Client;
7use mongodb::bson::{self, Bson, Document};
8use serde_json::Value;
9
10/// A sink that inserts JSON records into a MongoDB collection.
11///
12/// Each record must be a JSON object. Non-object values produce an error.
13/// Records are inserted in batches using `insert_many`.
14pub struct MongoSink {
15    config: MongoSinkConfig,
16    client: Client,
17}
18
19impl MongoSink {
20    /// Create a new MongoDB sink, establishing the client connection.
21    pub async fn new(config: MongoSinkConfig) -> Result<Self, FaucetError> {
22        faucet_core::validate_batch_size(config.batch_size)?;
23        let client = Client::with_uri_str(&config.connection_uri)
24            .await
25            .map_err(|e| FaucetError::Config(format!("MongoDB connection failed: {e}")))?;
26
27        Ok(Self { config, client })
28    }
29
30    /// Convert a `serde_json::Value` to a `bson::Document`.
31    ///
32    /// Returns a `Sink` error if the value is not a JSON object.
33    fn value_to_document(val: &Value) -> Result<Document, FaucetError> {
34        let bson = bson::to_bson(val)
35            .map_err(|e| FaucetError::Sink(format!("failed to convert JSON to BSON: {e}")))?;
36        match bson {
37            Bson::Document(doc) => Ok(doc),
38            other => Err(FaucetError::Sink(format!(
39                "expected a JSON object, got BSON type: {other:?}"
40            ))),
41        }
42    }
43}
44
45#[async_trait]
46impl faucet_core::Sink for MongoSink {
47    fn config_schema(&self) -> serde_json::Value {
48        serde_json::to_value(faucet_core::schema_for!(MongoSinkConfig))
49            .expect("schema serialization")
50    }
51
52    /// Non-mutating preflight probe: run the `ping` admin command against the
53    /// configured database via the existing client (probe name `"ping"`).
54    async fn check(
55        &self,
56        ctx: &faucet_core::check::CheckContext,
57    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
58        use faucet_core::check::{CheckReport, Probe};
59
60        let started = std::time::Instant::now();
61        let hint = "check connection_uri / credentials / that the MongoDB server is reachable";
62
63        let db = self.client.database(&self.config.database);
64        let probe =
65            match tokio::time::timeout(ctx.timeout, db.run_command(bson::doc! {"ping": 1})).await {
66                Ok(Ok(_)) => Probe::pass("ping", started.elapsed()),
67                Ok(Err(e)) => Probe::fail_hint("ping", started.elapsed(), e.to_string(), hint),
68                Err(_) => Probe::fail_hint("ping", started.elapsed(), "timed out", hint),
69            };
70        Ok(CheckReport::single(probe))
71    }
72
73    /// Write records to MongoDB.
74    ///
75    /// When `config.batch_size > 0` and the input slice is larger than
76    /// `batch_size`, the slice is split into chunks of `batch_size` documents
77    /// and each chunk is sent as a separate `insert_many` call. When
78    /// `config.batch_size == 0`, the entire slice is sent in a single
79    /// `insert_many` request — useful when upstream `StreamPage`s are already
80    /// sized for MongoDB's preferred per-request limits.
81    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
82        if records.is_empty() {
83            return Ok(0);
84        }
85
86        let collection = self
87            .client
88            .database(&self.config.database)
89            .collection::<Document>(&self.config.collection);
90
91        // `batch_size = 0` is the "no batching" sentinel: forward whatever
92        // upstream handed us as a single `insert_many`, preserving
93        // `StreamPage` framing. Otherwise re-chunk into `batch_size` slices.
94        let effective_chunk = if self.config.batch_size == 0 {
95            records.len()
96        } else {
97            self.config.batch_size
98        };
99
100        let mut total_written = 0usize;
101
102        for chunk in records.chunks(effective_chunk) {
103            let docs: Vec<Document> = chunk
104                .iter()
105                .map(Self::value_to_document)
106                .collect::<Result<Vec<_>, _>>()?;
107
108            let opts = mongodb::options::InsertManyOptions::builder()
109                .ordered(self.config.ordered)
110                .build();
111            collection
112                .insert_many(&docs)
113                .with_options(opts)
114                .await
115                .map_err(|e| FaucetError::Sink(format!("MongoDB insert_many failed: {e}")))?;
116
117            total_written += docs.len();
118            tracing::debug!(batch_size = docs.len(), "MongoDB batch inserted");
119        }
120
121        tracing::info!(
122            records = total_written,
123            database = %self.config.database,
124            collection = %self.config.collection,
125            "MongoDB write complete"
126        );
127
128        Ok(total_written)
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use serde_json::json;
136
137    #[test]
138    fn value_to_document_object() {
139        let val = json!({"name": "Alice", "age": 30});
140        let doc = MongoSink::value_to_document(&val).unwrap();
141        assert_eq!(doc.get_str("name").unwrap(), "Alice");
142        assert_eq!(doc.get_i64("age").unwrap(), 30);
143    }
144
145    #[test]
146    fn value_to_document_non_object_fails() {
147        let val = json!([1, 2, 3]);
148        let result = MongoSink::value_to_document(&val);
149        assert!(result.is_err());
150        assert!(matches!(result, Err(FaucetError::Sink(_))));
151    }
152
153    #[test]
154    fn value_to_document_string_fails() {
155        let val = json!("not an object");
156        let result = MongoSink::value_to_document(&val);
157        assert!(result.is_err());
158    }
159
160    #[test]
161    fn value_to_document_nested() {
162        let val = json!({"user": {"name": "Bob"}, "tags": ["a", "b"]});
163        let doc = MongoSink::value_to_document(&val).unwrap();
164        let inner = doc.get_document("user").unwrap();
165        assert_eq!(inner.get_str("name").unwrap(), "Bob");
166    }
167
168    #[test]
169    fn value_to_document_empty_object() {
170        let val = json!({});
171        let doc = MongoSink::value_to_document(&val).unwrap();
172        assert!(doc.is_empty());
173    }
174
175    #[test]
176    fn value_to_document_null_fails() {
177        let val = Value::Null;
178        let result = MongoSink::value_to_document(&val);
179        assert!(result.is_err());
180    }
181
182    #[tokio::test]
183    async fn new_rejects_out_of_range_batch_size() {
184        let mut config = MongoSinkConfig::new("mongodb://localhost:27017", "db", "c");
185        config.batch_size = faucet_core::MAX_BATCH_SIZE + 1;
186        match MongoSink::new(config).await {
187            Err(faucet_core::FaucetError::Config(m)) => {
188                assert!(m.contains("batch_size"), "got: {m}")
189            }
190            _ => panic!("expected a batch_size Config error"),
191        }
192    }
193}