faucet-source-mongodb
A MongoDB source that executes find() queries with filter, projection, sort, and limit, returning documents as JSON records.
Part of the faucet-stream ecosystem.
Installation
[]
= "1.0"
= { = "1", = ["full"] }
Or via the umbrella crate:
= { = "1.0", = ["source-mongodb"] }
Quick Start
use ;
use Source;
async
Configuration
MongoSourceConfig
| Field | Type | Default | Description |
|---|---|---|---|
connection_uri |
String |
(required) | MongoDB connection URI (e.g. mongodb://localhost:27017). Masked in debug output for security |
database |
String |
(required) | Database name |
collection |
String |
(required) | Collection name |
filter |
Option<Value> |
None |
Query filter as a JSON object, converted to a BSON Document at query time |
projection |
Option<Value> |
None |
Field projection as a JSON object (e.g. {"_id": 0, "name": 1}) |
sort |
Option<Value> |
None |
Sort specification as a JSON object (e.g. {"created_at": -1}) |
limit |
Option<i64> |
None |
Maximum number of documents to return |
cursor_batch_size |
Option<u32> |
None |
MongoDB driver cursor batch size (documents per server round-trip). Wire-level tuning knob — see Streaming and batching for how this differs from batch_size |
batch_size |
usize |
1000 |
Records per StreamPage emitted by Source::stream_pages. See Streaming and batching below |
⚠️ Breaking rename: the previous
batch_size: Option<u32>field has been renamed tocursor_batch_sizeso that the new standardisedbatch_size: usize(records perStreamPage) can live alongside it. Existing JSON/YAML configs usingbatch_size: <n>now configure the pipeline page size — if you relied on it as the driver cursor knob, rename it tocursor_batch_size.
Streaming and batching
MongoSource::stream_pages drives the MongoDB driver cursor (Collection::find)
without buffering the full result. Documents are accumulated into a
batch_size buffer and yielded as a StreamPage once the buffer fills;
the trailing partial page (if any) is yielded after the cursor drains.
batch_size = 0 is the "no batching" sentinel — the cursor is drained
completely and the entire result set is emitted in a single StreamPage.
Use it for small lookup tables, or for downstream sinks (SQL COPY,
BigQuery load jobs, Snowflake stage uploads) that prefer one large request
to many small ones. Values larger than MAX_BATCH_SIZE (1,000,000) are
rejected by faucet_core::validate_batch_size.
cursor_batch_size is the MongoDB driver's per-round-trip batch size
(find().batch_size(N)). It is independent of the pipeline-level
batch_size: you can have the driver fetch 100 docs per round-trip and
still yield StreamPages of 1000 docs (or vice versa), trading network
round-trips for buffering. Leave it unset to use the MongoDB driver's
default (101 documents).
The MongoDB source has no incremental-replication mode, so every emitted
page carries bookmark: None.
BSON Conversion
All JSON filter, projection, and sort values are automatically converted to BSON documents at query time. The JSON values must be objects -- arrays and scalars will produce an error. Documents returned from MongoDB are converted back to relaxed extended JSON format.
Config Loading
use ;
use MongoSourceConfig;
let config: MongoSourceConfig = load_json?;
let config: MongoSourceConfig = load_env_file?;
Example JSON config
Example .env file
MONGO_SOURCE_CONNECTION_URI=mongodb://localhost:27017
MONGO_SOURCE_DATABASE=mydb
MONGO_SOURCE_COLLECTION=users
MONGO_SOURCE_LIMIT=5000
MONGO_SOURCE_CURSOR_BATCH_SIZE=200
MONGO_SOURCE_BATCH_SIZE=1000
Config Schema Introspection
use Source;
let source = new.await?;
let schema = source.config_schema;
println!;
Examples
Simple collection read
use ;
use Source;
let config = new;
let source = new.await?;
let customers = source.fetch_all.await?;
Filtered query with projection and sort
use ;
use Source;
use json;
let config = new
.filter
.projection
.sort
.limit
.cursor_batch_size;
let source = new.await?;
let orders = source.fetch_all.await?;
println!;
Using with a Pipeline
use ;
use ;
use json;
let config = new
.filter
.sort
.limit;
let source = new.await?;
let pipeline = new;
let result = pipeline.run.await?;
License
Licensed under MIT or Apache-2.0.