---
title: MongoDB Examples
description: Complete examples for MongoDB processing with Spring Batch RS
sidebar:
order: 5
---
import { Tabs, TabItem, Card, CardGrid, Aside } from '@astrojs/starlight/components';
<Aside type="tip">
View the complete source: [examples/mongodb_processing.rs](https://github.com/sboussekeyt/spring-batch-rs/blob/main/examples/mongodb_processing.rs)
</Aside>
This page provides comprehensive examples for working with MongoDB using Spring Batch RS.
## Setup
Add MongoDB features to your `Cargo.toml`:
```toml
[dependencies]
spring-batch-rs = { version = "0.1", features = ["mongodb"] }
mongodb = "2.8"
serde = { version = "1.0", features = ["derive"] }
```
---
## Basic MongoDB Reading
```rust
use spring_batch_rs::{
core::step::{StepBuilder, StepExecution},
item::{
mongodb::mongodb_reader::{MongodbItemReaderBuilder, WithObjectId},
logger::LoggerWriter,
},
};
use mongodb::{
bson::{doc, oid::ObjectId},
sync::Client,
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Book {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
id: Option<ObjectId>,
title: String,
author: String,
isbn: String,
year: i32,
}
impl WithObjectId for Book {
fn get_id(&self) -> ObjectId {
self.id.unwrap_or_else(|| ObjectId::new())
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::with_uri_str("mongodb://localhost:27017")?;
let db = client.database("library");
let collection = db.collection::<Book>("books");
let reader = MongodbItemReaderBuilder::new()
.collection(&collection)
.filter(doc! {}) // Empty filter = all documents
.page_size(20)
.build();
let writer = LoggerWriterBuilder::<Book>::new().build();
let step = StepBuilder::new("read-mongodb")
.chunk::<Book, Book>(10)
.reader(&reader)
.writer(&writer)
.build();
let mut execution = StepExecution::new("read-mongodb");
step.execute(&mut execution)?;
Ok(())
}
```
<Aside type="tip">
Your document type must implement `WithObjectId` trait for cursor-based pagination.
</Aside>
---
## MongoDB Writing
```rust
use spring_batch_rs::item::mongodb::mongodb_writer::MongodbItemWriterBuilder;
use spring_batch_rs::item::csv::CsvItemReaderBuilder;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::with_uri_str("mongodb://localhost:27017")?;
let db = client.database("library");
let collection = db.collection::<Book>("books");
let reader = CsvItemReaderBuilder::<Book>::new()
.has_headers(true)
.from_path("books.csv")?;
let writer = MongodbItemWriterBuilder::new()
.collection(&collection)
.build();
let step = StepBuilder::new("csv-to-mongodb")
.chunk::<Book, Book>(100)
.reader(&reader)
.writer(&writer)
.build();
let mut execution = StepExecution::new("csv-to-mongodb");
step.execute(&mut execution)?;
Ok(())
}
```
---
## Filtered Queries
### Query with Filter
```rust
let filter = doc! {
"author": "J.K. Rowling",
"year": { "$gte": 2000 }
};
let reader = MongodbItemReaderBuilder::new()
.collection(&collection)
.filter(filter)
.page_size(50)
.build();
```
### Multiple Conditions
```rust
let filter = doc! {
"$and": [
{ "year": { "$gte": 2020 } },
{ "price": { "$lt": 30.0 } },
{ "in_stock": true }
]
};
```
---
## MongoDB to JSON Export
```rust
use spring_batch_rs::item::json::JsonItemWriterBuilder;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::with_uri_str("mongodb://localhost:27017")?;
let db = client.database("library");
let collection = db.collection::<Book>("books");
let filter = doc! { "year": { "$gte": 2020 } };
let reader = MongodbItemReaderBuilder::new()
.collection(&collection)
.filter(filter)
.page_size(100)
.build();
let writer = JsonItemWriterBuilder::<Book>::new()
.pretty_formatter(true)
.from_path("books_export.json")?;
let step = StepBuilder::new("mongodb-to-json")
.chunk::<Book, Book>(100)
.reader(&reader)
.writer(&writer)
.build();
Ok(())
}
```
---
## Data Transformation
```rust
use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
#[derive(Deserialize, Clone)]
struct RawBook {
title: String,
author: String,
price: String, // String from source
}
#[derive(Serialize)]
struct ProcessedBook {
title: String,
author: String,
price: f64, // Parsed to float
category: String,
}
struct BookProcessor;
impl ItemProcessor<RawBook, ProcessedBook> for BookProcessor {
fn process(&self, item: &RawBook) -> ItemProcessorResult<ProcessedBook> {
let price = item.price.parse::<f64>()
.map_err(|e| spring_batch_rs::error::BatchError::ItemProcessor(
format!("Invalid price: {}", e)
))?;
let category = if item.title.to_lowercase().contains("rust") {
"Programming"
} else if item.title.to_lowercase().contains("novel") {
"Fiction"
} else {
"General"
}.to_string();
Ok(ProcessedBook {
title: item.title.clone(),
author: item.author.clone(),
price,
category,
})
}
}
```
---
## MongoDB to PostgreSQL ETL
```rust
use sqlx::PgPool;
use spring_batch_rs::item::rdbc::postgres::PostgresItemWriterBuilder;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Source: MongoDB
let mongo_client = Client::with_uri_str("mongodb://localhost:27017")?;
let db = mongo_client.database("library");
let collection = db.collection::<Book>("books");
let reader = MongodbItemReaderBuilder::new()
.collection(&collection)
.filter(doc! {})
.page_size(100)
.build();
// Target: PostgreSQL
let pg_pool = PgPool::connect("postgres://user:pass@localhost/library_db").await?;
let writer = PostgresItemWriterBuilder::new()
.pool(pg_pool)
.table("books")
.binder(|query, book: &Book| {
query.push_values([book], |mut b, book| {
b.push_bind(&book.title)
.push_bind(&book.author)
.push_bind(&book.isbn)
.push_bind(&book.year);
});
})
.build();
let step = StepBuilder::new("mongodb-to-postgres")
.chunk::<Book, Book>(200)
.reader(&reader)
.writer(&writer)
.build();
Ok(())
}
```
---
## Complex Document Structures
### Nested Objects
```rust
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Author {
name: String,
bio: String,
birth_year: i32,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Review {
user: String,
rating: i32,
comment: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ComplexBook {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
id: Option<ObjectId>,
title: String,
author: Author,
reviews: Vec<Review>,
tags: Vec<String>,
metadata: HashMap<String, String>,
}
impl WithObjectId for ComplexBook {
fn get_id(&self) -> ObjectId {
self.id.unwrap_or_else(|| ObjectId::new())
}
}
```
---
## Aggregation Pipeline Results
```rust
#[derive(Debug, Deserialize, Clone)]
struct BookSummary {
#[serde(rename = "_id")]
author: String,
total_books: i32,
avg_rating: f64,
newest_year: i32,
}
// Note: For aggregation, you would typically run the pipeline first
// and write results to a temporary collection, then read from there
```
---
## Real-World Example: Data Migration
```rust
use chrono::Utc;
#[derive(Deserialize, Clone)]
struct LegacyUser {
user_id: String,
full_name: String,
email_address: String,
signup_date: String,
}
#[derive(Serialize)]
struct ModernUser {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
id: Option<ObjectId>,
user_id: String,
first_name: String,
last_name: String,
email: String,
created_at: String,
migrated_at: String,
}
struct UserMigrationProcessor;
impl ItemProcessor<LegacyUser, ModernUser> for UserMigrationProcessor {
fn process(&self, item: &LegacyUser) -> ItemProcessorResult<ModernUser> {
let parts: Vec<&str> = item.full_name.split_whitespace().collect();
let (first_name, last_name) = if parts.len() >= 2 {
(parts[0].to_string(), parts[1..].join(" "))
} else {
(item.full_name.clone(), String::new())
};
Ok(ModernUser {
id: None,
user_id: item.user_id.clone(),
first_name,
last_name,
email: item.email_address.to_lowercase(),
created_at: item.signup_date.clone(),
migrated_at: Utc::now().to_rfc3339(),
})
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::with_uri_str("mongodb://localhost:27017")?;
let db = client.database("myapp");
let source_collection = db.collection::<LegacyUser>("legacy_users");
let target_collection = db.collection::<ModernUser>("users");
let reader = MongodbItemReaderBuilder::new()
.collection(&source_collection)
.filter(doc! { "migrated": { "$ne": true } })
.page_size(100)
.build();
let processor = UserMigrationProcessor;
let writer = MongodbItemWriterBuilder::new()
.collection(&target_collection)
.build();
let step = StepBuilder::new("migrate-users")
.chunk::<LegacyUser, ModernUser>(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
Ok(())
}
```
---
## Performance Tips
<CardGrid>
<Card title="Page Size" icon="rocket">
Use page sizes of 100-500 for optimal cursor performance
</Card>
<Card title="Indexes" icon="star">
Ensure collections have indexes on filter fields
</Card>
<Card title="Projection" icon="setting">
Use projection in queries to reduce data transfer (configure in filter)
</Card>
<Card title="Bulk Writes" icon="list">
Writer uses `insert_many()` for efficient batch inserts
</Card>
</CardGrid>
## Best Practices
<Aside type="tip">
**MongoDB Tips:**
1. **Implement WithObjectId**: Required for pagination support
2. **Use Filters**: Apply filters to reduce data scanned
3. **Connection Strings**: Use appropriate auth and replica set options
4. **Error Handling**: Use `skip_limit()` for data quality issues
5. **Batch Size**: Match chunk size to page size for consistency
</Aside>
## Next Steps
- [Database Examples](/spring-batch-rs/examples/database/) - SQL database examples
- [API Reference](/spring-batch-rs/api/item-reader/) - Complete API documentation
- [Performance Guide](/spring-batch-rs/reference/performance/) - Tuning tips