faucet-source-mongodb 1.0.0

MongoDB source connector for the faucet-stream ecosystem
Documentation

faucet-source-mongodb

Crates.io Docs.rs

A MongoDB source that executes find() queries with filter, projection, sort, and limit, returning documents as JSON records.

Part of the faucet-stream ecosystem.

Installation

[dependencies]
faucet-source-mongodb = "1.0"
tokio = { version = "1", features = ["full"] }

Or via the umbrella crate:

faucet-stream = { version = "1.0", features = ["source-mongodb"] }

Quick Start

use faucet_source_mongodb::{MongoSource, MongoSourceConfig};
use faucet_core::Source;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = MongoSourceConfig::new(
        "mongodb://localhost:27017",
        "my_database",
        "users",
    );

    let source = MongoSource::new(config).await?;
    let records = source.fetch_all().await?;

    for record in &records {
        println!("{}", record);
    }
    Ok(())
}

Configuration

MongoSourceConfig

Field Type Default Description
connection_uri String (required) MongoDB connection URI (e.g. mongodb://localhost:27017). Masked in debug output for security
database String (required) Database name
collection String (required) Collection name
filter Option<Value> None Query filter as a JSON object, converted to a BSON Document at query time
projection Option<Value> None Field projection as a JSON object (e.g. {"_id": 0, "name": 1})
sort Option<Value> None Sort specification as a JSON object (e.g. {"created_at": -1})
limit Option<i64> None Maximum number of documents to return
cursor_batch_size Option<u32> None MongoDB driver cursor batch size (documents per server round-trip). Wire-level tuning knob — see Streaming and batching for how this differs from batch_size
batch_size usize 1000 Records per StreamPage emitted by Source::stream_pages. See Streaming and batching below

⚠️ Breaking rename: the previous batch_size: Option<u32> field has been renamed to cursor_batch_size so that the new standardised batch_size: usize (records per StreamPage) can live alongside it. Existing JSON/YAML configs using batch_size: <n> now configure the pipeline page size — if you relied on it as the driver cursor knob, rename it to cursor_batch_size.

Streaming and batching

MongoSource::stream_pages drives the MongoDB driver cursor (Collection::find) without buffering the full result. Documents are accumulated into a batch_size buffer and yielded as a StreamPage once the buffer fills; the trailing partial page (if any) is yielded after the cursor drains.

batch_size = 0 is the "no batching" sentinel — the cursor is drained completely and the entire result set is emitted in a single StreamPage. Use it for small lookup tables, or for downstream sinks (SQL COPY, BigQuery load jobs, Snowflake stage uploads) that prefer one large request to many small ones. Values larger than MAX_BATCH_SIZE (1,000,000) are rejected by faucet_core::validate_batch_size.

cursor_batch_size is the MongoDB driver's per-round-trip batch size (find().batch_size(N)). It is independent of the pipeline-level batch_size: you can have the driver fetch 100 docs per round-trip and still yield StreamPages of 1000 docs (or vice versa), trading network round-trips for buffering. Leave it unset to use the MongoDB driver's default (101 documents).

The MongoDB source has no incremental-replication mode, so every emitted page carries bookmark: None.

BSON Conversion

All JSON filter, projection, and sort values are automatically converted to BSON documents at query time. The JSON values must be objects -- arrays and scalars will produce an error. Documents returned from MongoDB are converted back to relaxed extended JSON format.

Config Loading

use faucet_core::config::{load_json, load_env_file};
use faucet_source_mongodb::MongoSourceConfig;

let config: MongoSourceConfig = load_json("config.json")?;
let config: MongoSourceConfig = load_env_file(".env", "MONGO_SOURCE")?;

Example JSON config

{
  "connection_uri": "mongodb://admin:password@mongo.example.com:27017",
  "database": "analytics",
  "collection": "events",
  "filter": {
    "event_type": "purchase",
    "timestamp": { "$gte": "2025-01-01T00:00:00Z" }
  },
  "projection": {
    "_id": 0,
    "user_id": 1,
    "event_type": 1,
    "amount": 1,
    "timestamp": 1
  },
  "sort": { "timestamp": -1 },
  "limit": 10000,
  "cursor_batch_size": 500,
  "batch_size": 5000
}

Example .env file

MONGO_SOURCE_CONNECTION_URI=mongodb://localhost:27017
MONGO_SOURCE_DATABASE=mydb
MONGO_SOURCE_COLLECTION=users
MONGO_SOURCE_LIMIT=5000
MONGO_SOURCE_CURSOR_BATCH_SIZE=200
MONGO_SOURCE_BATCH_SIZE=1000

Config Schema Introspection

use faucet_core::Source;

let source = MongoSource::new(config).await?;
let schema = source.config_schema();
println!("{}", serde_json::to_string_pretty(&schema)?);

Examples

Simple collection read

use faucet_source_mongodb::{MongoSource, MongoSourceConfig};
use faucet_core::Source;

let config = MongoSourceConfig::new(
    "mongodb://localhost:27017",
    "mydb",
    "customers",
);
let source = MongoSource::new(config).await?;
let customers = source.fetch_all().await?;

Filtered query with projection and sort

use faucet_source_mongodb::{MongoSource, MongoSourceConfig};
use faucet_core::Source;
use serde_json::json;

let config = MongoSourceConfig::new(
    "mongodb://localhost:27017",
    "ecommerce",
    "orders",
)
.filter(json!({
    "status": "completed",
    "total": { "$gt": 100 }
}))
.projection(json!({
    "_id": 0,
    "order_id": 1,
    "customer": 1,
    "total": 1,
    "items": 1
}))
.sort(json!({"total": -1}))
.limit(500)
.cursor_batch_size(100);

let source = MongoSource::new(config).await?;
let orders = source.fetch_all().await?;
println!("Found {} high-value orders", orders.len());

Using with a Pipeline

use faucet_source_mongodb::{MongoSource, MongoSourceConfig};
use faucet_core::{Pipeline, Source, Sink};
use serde_json::json;

let config = MongoSourceConfig::new(
    "mongodb://mongo.production.com:27017",
    "logs",
    "app_events",
)
.filter(json!({"level": "error"}))
.sort(json!({"timestamp": -1}))
.limit(10000);

let source = MongoSource::new(config).await?;
let pipeline = Pipeline::new(Box::new(source), Box::new(my_sink));
let result = pipeline.run().await?;

License

Licensed under MIT or Apache-2.0.