# mongor
Rust based ODM for MongoDB, built on-top of the official [`mongodb`](https://crates.io/crates/mongodb) crate. Provides an ergonomic interface to interact with the underlying driver, alongside a simplified testing framework to make writing tests suites more ergonomic as well.
Table of Contents
-----------------
- [Requirements](#requirements)
- [Installation](#installation)
- [Documentation](#documentation)
- [Usage](#usage)
- [Overview](#overview)
- [Basic example](#basic-example)
- [Sessions](#sessions)
- [Overview](#overview-1)
- [Basic example](#basic-example-1)
- [Atomic Transactions](#atomic-transactions)
- [Testing](#testing)
- [Overview](#overview-2)
- [Basic example](#basic-example-2)
- [GridFS](#gridfs)
- [Overview](#overview-3)
- [Basic example](#basic-example-3)
- [Bug Reports](#bug-reports)
- [Feature Requests](#feature-requests)
- [Contributing](#contributing)
- [Project Status](#project-status)
- [License](#license)
## Requirements
- MongoDB deployment (version 3.6+):
- [Official installation guide](https://www.mongodb.com/docs/manual/installation/)
- [Instillation guide when using WSL2 + Linux (*Guide by Sean Welsh Brown*)](https://dev.to/seanwelshbrown/installing-mongodb-on-windows-subsystem-for-linux-wsl-2-19m9)
- [Official Atlas(Cloud) deployment guide](https://www.mongodb.com/basics/mongodb-atlas-tutorial)
- Rust (version 1.6+):
- [Official installation guide](https://www.rust-lang.org/tools/install)
## Installation
`cargo add mongor`
or add:
```
mongor = "0.1.0"
```
to `[dependencies]` in `Cargo.toml`
## Documentation
This README provides a general overview, and practical examples, but does not go over all methods available. [Full crate documentation can be found here at docs.rs](https://docs.rs/mongor/latest/mongor/)
## Usage
### Overview
While the crate exposes the `core` module, the main interface designed for this crate is the `Model<D>` struct. This struct controls a generic `D` that represents the structure of the document you wish for the model to control. In the examples below I will demonstrate the general pattern to using a model, and also some more ergonomic ways to perform atomic transactions and building a test suite.
### Basic example
```
use mongor::{
core::find::FindManyCursor,
model::Model,
mongodb::{
bson::{doc, oid::ObjectId},
options::*,
results::*,
Client, Collection,
},
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub species: String,
pub sightings: usize,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client_options = ClientOptions::parse("mongodb://localhost:27017/").await?;
let client = Client::with_options(client_options)?;
let db = client.database("db_name");
let collection: Collection<Shark> = db.collection("shark");
let model: Model<Shark> = Model::from(collection);
// Applying an index
model
.apply_unique_index(doc! { "species": 1 }, Some("species_unique"), None, None)
.await?;
// Inserting a document
let new_shark = Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
};
let document_oid: ObjectId = model.insert_one(new_shark, None, None).await?;
// Updating a document
let update_result: UpdateResult = model
.update_one_by_oid(
&document_oid,
doc! {
"$inc": {
"sightings": 1,
},
},
None,
None,
)
.await?;
// Finding a document
let shark: Option<Shark> = model
.find_one_by_field_value("sightings", 1, None, None)
.await?;
println!("{:?}", shark);
// Log example:
// ```
// Some(Shark {
// oid: Some(ObjectId("65712bdfb1fb166eb8cce7e5")),
// name: "Whale Shark",
// species: "Rhincodon typus",
// sightings: 1,
// })
// ```
// Deleting a document
let delete_result: DeleteResult = model
.delete_one(
doc! {
"_id": &document_oid,
},
None,
None,
)
.await?;
// Inserting many
let sharks = vec![
Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
},
Shark {
oid: None,
name: "Great White".to_string(),
species: "Carcharodon carcharias".to_string(),
sightings: 0,
},
];
let document_oids: Vec<ObjectId> = model.insert_many(sharks, None, None).await?;
// Cursor-ing via a `findMany` query
let options = FindOptions::builder().sort(doc! { "name": -1 }).build();
let mut cursor: FindManyCursor<Shark> = model
.find_many(doc! { "sightings": 0 }, Some(options), None)
.await?;
while let Some(shark) = cursor.next().await? {
// shark: Shark
}
// Updating many
let update_result: UpdateResult = model
.update_many(
doc! { "sightings": 0 },
doc! { "$inc": { "sightings": 5 }},
None,
None,
)
.await?;
// Deleting many
let delete_result: DeleteResult = model.delete_many(doc! {}, None, None).await?;
db.drop(None).await?;
Ok(())
}
```
## Sessions
Official References:
https://www.mongodb.com/docs/manual/reference/method/Session/
https://www.mongodb.com/docs/manual/replication/
### Overview
Please note, to use sessions and transactions with MongoDB, currently you require a replica set deployment. Setting this up is very simple, as you just need to run multiple `mongod` (MongoDB server) instances in a replica setup.
You can start such a set via a method in the shell called [`rs.initiate()`](https://www.mongodb.com/docs/manual/reference/method/rs.initiate/). Here is an example of how to setup a replica set with three nodes.
First, connect to a MongoDB instance on the main port: say `27017`:
```
mongo --port 27017
```
Then, call `rs.initiate()` to start the replica set with the specified members:
```
rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "localhost:27017" },
{ _id: 1, host: "localhost:27018" },
{ _id: 2, host: "localhost:27019" }
]
});
```
Then you need to setup separate data directories for each set:
```
mkdir ~/data/mongo_rp_1 &&
mkdir ~/data/mongo_rp_2 &&
mkdir ~/data/mongo_rp_3
```
Then, you can start three separate instances to deploy the replica set under the configured hosts:
```
sudo mongod --port 27017 --dbpath ~/data/mongo_rp_1 --replSet rs0
---
sudo mongod --port 27018 --dbpath ~/data/mongo_rp_2 --replSet rs0
---
sudo mongod --port 27019 --dbpath ~/data/mongo_rp_3 --replSet rs0
```
Now, you have a replica set deployed under the name `rs0` with three nodes.
### Basic Example
```
use mongor::{
core::{
find::FindManyCursor,
session::{commit_transaction, start_session, start_transaction},
},
model::Model,
mongodb::{
bson::{doc, oid::ObjectId},
options::*,
results::*,
Client, Collection,
},
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub species: String,
pub sightings: usize,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Note: We pass all three of our nodes hosts to the client options
let client_options =
ClientOptions::parse("mongodb://localhost:27017,localhost:27018,localhost:27019/").await?;
let client = Client::with_options(client_options)?;
let db = client.database("db_name");
let collection: Collection<Shark> = db.collection("shark");
let model: Model<Shark> = Model::from(collection);
let mut session = start_session(&client, None).await?;
start_transaction(&mut session, None).await?;
// Inserting a document with a session
let new_shark = Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
};
let document_oid: ObjectId = model
.insert_one(new_shark, None, Some(&mut session))
.await?;
commit_transaction(&mut session).await?;
// Cursor-ing via a `findMany` query with a session
let options = FindOptions::builder().sort(doc! { "name": -1 }).build();
let mut cursor: FindManyCursor<Shark> = model
.find_many(doc! { "sightings": 0 }, Some(options), Some(&mut session))
.await?;
while let Some(shark) = cursor.next_with_session(&mut session).await? {
// shark: Shark
}
db.drop(None).await?;
Ok(())
}
```
### Atomic Transactions
A method called `run_atomic_transaction` is provided in `core::atomic`. This method allows a "short-hand" to run a async closure, and having database operations inside protected under a Session and a single transaction. Upon failure, it will automatically attempt to abort the transaction, and upon no failures, (Closure returning `Ok(())`), it will attempt to commit the transaction. Creating a more ergonomic way to structure transactions that must be atomic.
*Note*: This method uses `std::sync::Mutex` in its implementation, I made this decision to keep the crate as agnostic and lightweight as possible, but if the need presents itself, make a PR or an issue to add a feature that enables a version with `tokio::sync::Mutex`.
Example:
```
use mongor::{
core::{atomic::run_atomic_transaction, session::start_session},
error::Error,
model::Model,
mongodb::{
bson::{doc, oid::ObjectId},
options::*,
Client, ClientSession, Collection,
},
};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub species: String,
pub sightings: usize,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Researcher {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub sighted: Vec<String>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Note: We pass all three of our nodes hosts to the client options
let client_options =
ClientOptions::parse("mongodb://localhost:27017,localhost:27018,localhost:27019/").await?;
let client = Client::with_options(client_options)?;
let db = client.database("db_name");
let shark_collection: Collection<Shark> = db.collection("shark");
let researcher_collection: Collection<Researcher> = db.collection("researcher");
let shark_model: Model<Shark> = Model::from(shark_collection);
let researcher_model: Model<Researcher> = Model::from(researcher_collection);
let session = start_session(&client, None).await?;
let shared_session = Arc::new(Mutex::new(session));
let shark_model_ref = &shark_model;
let researcher_model_ref = &researcher_model;
run_atomic_transaction(
|shared_session: Arc<Mutex<ClientSession>>| async move {
let mut session_lock = shared_session
.lock()
// Note: Here I am using `mongor::error::Error`, but you can use any error
// type that implements `dyn std::error::Error` inside this closure to use `?`
.map_err(|err| Error::SharedSessionLockFailure(err.to_string()))?;
let new_shark = Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
};
let new_researcher = Researcher {
oid: None,
name: "John Doe".to_string(),
sighted: vec![],
};
let shark_oid: ObjectId = shark_model_ref
.insert_one(new_shark, None, Some(&mut session_lock))
.await?;
let researcher_oid: ObjectId = researcher_model_ref
.insert_one(new_researcher, None, Some(&mut session_lock))
.await?;
researcher_model_ref
.update_one_by_oid(
&researcher_oid,
doc! {
"$push": {
"sighted": "Whale Shark",
},
},
None,
Some(&mut session_lock),
)
.await?;
shark_model_ref
.update_one_by_oid(
&shark_oid,
doc! {
"$inc": {
"sightings": 1,
},
},
None,
Some(&mut session_lock),
)
.await?;
Ok(())
},
shared_session,
None,
)
.await?;
let shark = shark_model.find_one(doc! {}, None, None).await?;
let researcher = researcher_model.find_one(doc! {}, None, None).await?;
println!("{:?}", shark);
// Example log:
// Some(Shark {
// oid: Some(ObjectId("657226d179aa103ff4cf05ab")),
// name: "Whale Shark",
// species: "Rhincodon typus",
// sightings: 1,
// })
println!("{:?}", researcher);
// Example log:
// Some(Researcher {
// oid: Some(ObjectId("657226d179aa103ff4cf05ac")),
// name: "John Doe",
// sighted: ["Whale Shark"],
// })
db.drop(None).await?;
Ok(())
}
```
Now, lets modify the closure to purposely fail.. you will notice the operations inside the closure are not performed.
```
use mongor::{
core::{atomic::run_atomic_transaction, session::start_session},
error::Error,
model::Model,
mongodb::{
bson::{doc, oid::ObjectId},
options::*,
Client, ClientSession, Collection,
},
};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub species: String,
pub sightings: usize,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Researcher {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub sighted: Vec<String>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Note: We pass all three of our nodes hosts to the client options
let client_options =
ClientOptions::parse("mongodb://localhost:27017,localhost:27018,localhost:27019/").await?;
let client = Client::with_options(client_options)?;
let db = client.database("db_name");
let shark_collection: Collection<Shark> = db.collection("shark");
let researcher_collection: Collection<Researcher> = db.collection("researcher");
let shark_model: Model<Shark> = Model::from(shark_collection);
let researcher_model: Model<Researcher> = Model::from(researcher_collection);
let session = start_session(&client, None).await?;
let shared_session = Arc::new(Mutex::new(session));
let shark_model_ref = &shark_model;
let researcher_model_ref = &researcher_model;
let _ = run_atomic_transaction(
|shared_session: Arc<Mutex<ClientSession>>| async move {
let mut session_lock = shared_session
.lock()
// Note: Here I am using `mongor::error::Error`, but you can use any error
// type that implements `dyn std::error::Error` inside this closure to use `?`
.map_err(|err| Error::SharedSessionLockFailure(err.to_string()))?;
let new_shark = Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
};
let new_researcher = Researcher {
oid: None,
name: "John Doe".to_string(),
sighted: vec![],
};
let shark_oid: ObjectId = shark_model_ref
.insert_one(new_shark, None, Some(&mut session_lock))
.await?;
let researcher_oid: ObjectId = researcher_model_ref
.insert_one(new_researcher, None, Some(&mut session_lock))
.await?;
// Purposefully erroring after insertions
Err(Error::internal("some error"))?;
Ok(())
},
shared_session,
None,
)
.await;
let shark = shark_model.find_one(doc! {}, None, None).await?;
let researcher = researcher_model.find_one(doc! {}, None, None).await?;
// Note: the insertions did not occur as the "transaction" as a whole failed.
println!("{:?}", shark);
// Logs:
// `None`
println!("{:?}", researcher);
// Logs:
// `None`
db.drop(None).await?;
Ok(())
}
```
## Testing
### Overview
A simple testing framework is provided via the `test_db` module. The general use case it to create a new `TestDB` per test, which spawns a new `Database` associated with a newly generated `ObjectId` for its name to allow for easy handling of running a concurrent test suite, which is common with frameworks like `tokio`.
Then, a method called `TestDB::run_test` is provided to take ownership of this db and run a isolated test closure, in which the Database will be cleaned up(dropped) on success or failure of the test.
Pre-defined assertions are provided in `test_db::asset_model::AssertModel`, which wraps a `Model<D>`. As well as a module `test_db::assert_document::*` which provides general assertions regarding documents. A macro called `test_error!` from `test_db::test_error` is also provided, that acts like `panic!` inside this test closure. The closure accepts a `Future<Output = Result<(), Box<dyn std::error::Error>>>` to allow the use of the `?` operator to end the test as well.
### Basic Example
```
use mongor::mongodb::bson::{doc, oid::ObjectId};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub species: String,
pub sightings: usize,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
#[cfg(test)]
mod tests {
use mongor::{
model::Model,
mongodb::{
bson::{doc, oid::ObjectId},
options::ClientOptions,
Client, Collection,
},
test_db::{assert_model::AssertModel, test_error::TestError, TestDB},
test_error,
};
use crate::Shark;
#[tokio::test]
pub async fn example_test() {
let client_options = ClientOptions::parse("mongodb://localhost:27017/")
.await
.unwrap();
let client = Client::with_options(client_options).unwrap();
// Opens a new isolated `Database` named with a `ObjectId`
let test_db = TestDB::new(&client);
let shark_collection: Collection<Shark> = test_db.collection("shark");
let shark_model = Model::from(shark_collection);
// Run isolated test, cleaning up the Database on failure or success
test_db
.run_test(
|| async {
let new_shark = Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
};
let oid = shark_model.insert_one(new_shark, None, None).await?;
// Using pre-defined assertions from `test_db::assert_model`,
// which call `test_error!` under the hood
let assert_shark_model = AssertModel::from(&shark_model);
assert_shark_model
.assert_exists(doc! { "name": "Whale Shark" }, None)
.await?
.assert_count(doc! {}, 1, None)
.await?;
// Using the `test_error!` macro from `test_db::test_error`,
// which acts like `panic!` inside the closure
let some_other_oid = ObjectId::new();
if oid == some_other_oid {
test_error!("Inserted shark oid {} == {}", oid, some_other_oid);
}
Ok(())
},
None,
)
.await
.unwrap();
}
}
```
## GridFS
Official References:
https://www.mongodb.com/docs/manual/core/gridfs/
### Overview
Using the crate feature `grid_fs`, you can expose access to the `GridFs` struct, which is a robust interface to the underlying `GridFsBucket`. This feature adds a single dependency `futures_util`.
A lot of the methods involve the concept of a `revision` number. As the `name` field on the file document is not unique, we have the ability to have multiple revisions of a file, they can be defined as:
```
0 = the original stored file
1 = the first revision
2 = the second revision
etc...
-2 = the second most recent revision
-1 = the most recent revision
```
Methods regarding streaming (uploading and downloading) are implemented in the `mongodb` crate via `futures_util::io::*`, so if you are using `tokio`, you must add a dependency `tokio_util` which provides `tokio_util::compat::*`. This module will expose `compat()` and `compat_write()` to convert tokio async read or writers into a `Compat<_>` which can directly be used with the `futures_util::io::AsyncRead` and `futures_util::io::AsyncWrite`.
See the basic example or: `https://docs.rs/tokio-util/latest/tokio_util/compat/index.html` for more information.
### Basic Example
```
use futures_util::io::Cursor;
use mongor::{
grid_fs::GridFs,
mongodb::{
bson::{doc, oid::ObjectId},
options::*,
Client,
},
};
use tokio::fs::*;
use tokio_util::compat::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client_options = ClientOptions::parse("mongodb://localhost:27017/").await?;
let client = Client::with_options(client_options)?;
let db = client.database("db_name");
let bucket_options = GridFsBucketOptions::builder()
.bucket_name(Some("shark_images".to_string()))
.build();
let grid_fs = GridFs::new(&db, Some(bucket_options));
// Upload a file
let file = File::open("./test_data/shark-0.png").await?;
let revision_0_oid: ObjectId = grid_fs.upload("shark", file.compat(), None).await?;
// Upload a revision of a file
let file = File::open("./test_data/shark-1.jpg").await?;
let revision_1_oid: ObjectId = grid_fs.upload("shark", file.compat(), None).await?;
// Get a `core::find::FindManyCursor` of the `FilesCollectionDocument`s
let mut cursor = grid_fs.find_many(doc! {}, None).await?;
while let Some(file_document) = cursor.next().await? {
// file_document: FilesCollectionDocument
}
// Download the most recent revision of a file
let mut writer = Cursor::new(vec![]);
grid_fs.download("shark", &mut writer, None).await?;
let file_data: Vec<u8> = writer.into_inner();
// Download a specific revision of a file
let mut writer = Cursor::new(vec![]);
grid_fs.download("shark", &mut writer, Some(0)).await?;
let file_data: Vec<u8> = writer.into_inner();
// Rename a file (All revisions as None was provided to revision)
grid_fs
.rename_by_filename("shark", "new_shark", None)
.await?;
// Delete a file (All revisions as None was provided to revision)
grid_fs.delete_by_filename("new_shark", None).await?;
db.drop(None).await?;
Ok(())
}
```
## Bug Reports
Please report bugs by creating an `issue`, or if there is a sufficient fix you are aware of, feel free to open a PR, but please follow the `Contributing` guidelines below.
To report a bug, it must be directly related to this crate, and you must provide as much information as possible, such as:
- Code examples
- Error messages
- Steps to reproduce
- System information (If applicable)
## Feature requests
If you feel there is something missing, or some variation of the current crate that would require additional dependencies other than `mongodb` and `serde` ( For example, `GridFs` requires `future_util`, so it is a separate feature ); please create an `issue` with the request and discuss why you feel it should be part of this crate and not a third party crate as I have plans to implement a `mongor-extras` crate to add additional functionality that has a more defined scope, and wish to keep this crate as agnostic and flexible as possible.
## Contributing
I welcome anyone to contribute to the crate. But I do have some general requirements:
- The change should keep the crate agnostic to use cases, I.E. where possible, the methods should allow use of Sessions, and options from `mongodb::options::*` instead of custom options, and exposed closures, like seen in `run_test` or `run_atomic_transaction` should interface publicly well with generic types or generics like `dyn std::error::Error`.
- Any additional or modified methods require unit testing with 100% test coverage, that should be placed in the `tests` module.
- Any change that adds in additional dependencies should be created as a separate feature.
- All current unit tests must pass, I.E. run `cargo test` and all should pass.
- Add your name and or handle to `CONTRIBUTORS.md` if not already present, as well as to the `Authors` section on the header comment for the file.
- If adding in a new dependency, please update `License::Third Party` in this README to correspond with their licensing.
If your change meets these guidelines, feel free to open a PR.
## Project Status
I plan to maintain this crate for the forseeable future, the crate API is subject to change, although I would anticipate simplifications and additions rather than deletions or major modifications of the current feature set.
## License
MIT
See `LICENSE.md` for more information
### Third Party
This crate is built on-top of:
- The [`mongodb`](https://github.com/mongodb/mongo-rust-driver/tree/main) crate, which is licensed under Apache License 2.0, [view it here](https://github.com/mongodb/mongo-rust-driver/blob/main/LICENSE).
- The [`serde`](https://github.com/serde-rs/serde) crate, which is licensed under either Apache License Version 2.0 ([view it here](https://github.com/serde-rs/serde/blob/master/LICENSE-APACHE)), or MIT license ([view it here](https://github.com/serde-rs/serde/blob/master/LICENSE-MIT)).
- The [`futures_util`](https://github.com/rust-lang/futures-rs) crate (Used in the `gird_fs` feature), which is licensed under either Apache License Version 2.0 ([view it here](https://github.com/rust-lang/futures-rs/blob/master/LICENSE-APACHE)), or MIT license ([view it here](https://github.com/rust-lang/futures-rs/blob/master/LICENSE-MIT)).
- The [`tokio`](https://github.com/tokio-rs/tokio) crate (Used in internal testing), which is licensed under MIT, [view it here](https://github.com/tokio-rs/tokio/blob/master/LICENSE).
- The [`tokio_util`](https://github.com/tokio-rs/tokio/tree/master/tokio-util) crate (Used in internal testing), which is licensed under MIT, [view it here](https://github.com/tokio-rs/tokio/blob/master/tokio-util/LICENSE).