use core::borrow::Borrow;
use futures::stream::TryStreamExt;
use futures::Stream;
use mongodb::bson::doc;
use mongodb::bson::oid::ObjectId;
use mongodb::bson::Document;
use crate::core::error2::Error;
use crate::core::error2::Result;
use crate::core::mongo::grid_fs::{GridFS, GridFSBucketOptions};
use mongodb::options::DistinctOptions;
use mongodb::options::{
AggregateOptions, FindOneOptions, FindOptions, ReadConcern, UpdateOptions, WriteConcern,
};
use tokio::io::AsyncRead;
#[derive(Clone)]
pub struct MongoClient {
pub client: mongodb::Client,
}
impl MongoClient {
pub async fn build(settings: &super::MongoSettings) -> Self {
Self {
client: super::make_mongo(settings).await.unwrap(),
}
}
pub async fn count(&self, _database: &str, _coll: &str) -> Result<u64> {
let _result = self
.client
.database(_database)
.collection::<mongodb::bson::Document>(_coll)
.count_documents(doc! {})
.await
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?;
Ok(_result)
}
pub async fn save_file(
&self,
_database: &str,
file_name: &str,
file_path: &std::path::PathBuf,
) -> Result<Option<String>> {
let file = tokio::fs::File::open(&file_path).await.map_err(|e| {
log::error!("upload_from_stream: error={:?}", e);
anyhow::anyhow!(e)
})?;
let mut bucket = GridFS::new(
(self.client).database(_database).clone(),
Some(GridFSBucketOptions::default()),
);
let id = &bucket
.upload_from_stream(file_name, file, None)
.await
.map_err(|e| {
log::error!("upload_from_stream: error={:?}", e);
anyhow::anyhow!(e)
})?;
Ok(Some(id.to_string()))
}
pub async fn save_file_with_opts(
&self,
_database: &str,
file_name: &str,
file_path: &std::path::PathBuf,
metadata: Document, ) -> Result<Option<String>> {
let file = tokio::fs::File::open(&file_path).await.map_err(|e| {
log::error!("save_file_with_opts: error={:?}", e);
anyhow::anyhow!(e)
})?;
let mut bucket = GridFS::new(
(self.client).database(_database).clone(),
Some(GridFSBucketOptions::default()),
);
let file_id = &bucket
.upload_from_stream(file_name, file, None)
.await
.map_err(|e| {
log::error!("upload_from_stream: error={:?}", e);
anyhow::anyhow!(e)
})?;
let filter = doc! { "_id": file_id };
let update = doc! { "$set": metadata };
let options = Some(
mongodb::options::UpdateOptions::builder()
.upsert(false)
.build(),
);
let _ = self
.client
.database(_database)
.collection::<mongodb::bson::Document>("fs.files")
.update_one(filter, update)
.with_options(options)
.await
.map_err(|e| {
log::error!("save_file_with_opts: error={:?}", e);
anyhow::anyhow!(e)
})?;
Ok(Some(file_id.to_string()))
}
pub async fn save_stream_with_opts(
&self,
_database: &str,
file_name: &str,
stream: impl AsyncRead + Unpin,
metadata: Document, ) -> Result<Option<String>> {
let mut bucket = GridFS::new(
(self.client).database(_database).clone(),
Some(GridFSBucketOptions::default()),
);
let file_id = &bucket
.upload_from_stream(file_name, stream, None)
.await
.map_err(|e| {
log::error!("upload_from_stream: error={:?}", e);
anyhow::anyhow!(e)
})?;
let filter = doc! { "_id": file_id };
let update = doc! { "$set": metadata };
let options = Some(
mongodb::options::UpdateOptions::builder()
.upsert(false)
.build(),
);
let _ = self
.client
.database(_database)
.collection::<mongodb::bson::Document>("fs.files")
.update_one(filter, update)
.with_options(options)
.await
.map_err(|e| {
log::error!("save_file_with_opts: error={:?}", e);
anyhow::anyhow!(e)
})?;
Ok(Some(file_id.to_string()))
}
pub async fn get_file_content_type(
&self,
database: &str,
file_id: &str,
) -> Result<Option<String>> {
let obj_id = ObjectId::parse_str(file_id)?;
let filter = doc! {"_id": obj_id};
let find_options = FindOneOptions::builder()
.projection(Some(doc! { "mime_type": 1 }))
.build();
let result = self
.client
.database(database)
.collection::<mongodb::bson::Document>("fs.files")
.find_one(filter)
.with_options(find_options)
.await
.map_err(|e| {
log::error!("get_file_content_type: error={:?}", e);
anyhow::anyhow!(e)
})?;
let mime_type = match result {
Some(document) => {
if let Some(mongodb::bson::Bson::String(mime_type)) = document.get("mime_type") {
Some(mime_type.to_string())
} else {
None
}
}
None => None,
};
Ok(mime_type)
}
pub async fn get_file(
&self,
_database: &str,
file_id: &str,
) -> Result<(Option<String>, impl Stream<Item = Vec<u8>>)> {
let file_id = ObjectId::parse_str(file_id)?;
let bucket = GridFS::new(
(self.client).database(_database).clone(),
Some(GridFSBucketOptions::default()),
);
let cursor = bucket.open_download_stream(file_id).await.map_err(|e| {
log::error!("open_download_stream: error={:?}", e);
anyhow::anyhow!(e)
})?;
let find_options = FindOneOptions::builder()
.projection(Some(doc! { "mime_type": 1 }))
.build();
let result = self
.client
.database(_database)
.collection::<mongodb::bson::Document>("fs.files")
.find_one(doc! { "_id": file_id })
.with_options(find_options)
.await
.map_err(|e| {
log::error!("get_file: error={:?}", e);
anyhow::anyhow!(e)
})?;
let mime_type = match result {
Some(document) => {
if let Some(mongodb::bson::Bson::String(mime_type)) = document.get("mime_type") {
Some(mime_type.to_string())
} else {
None
}
}
None => None,
};
Ok((mime_type, cursor))
}
pub async fn get_file_by_md5(
&self,
database: &str,
file_md5: &str,
) -> Result<Option<(String, u64)>> {
let filter = doc! {"md5": file_md5};
let find_options = FindOneOptions::builder()
.projection(Some(doc! { "length": 1, "_id": 1 }))
.build();
let result = self
.client
.database(database)
.collection::<mongodb::bson::Document>("fs.files")
.find_one(filter)
.with_options(find_options)
.await
.map_err(|e| {
log::error!("get_file_by_md5: error={:?}", e);
anyhow::anyhow!(e)
})?;
let result = match result {
Some(document) => {
if let Some(mongodb::bson::Bson::ObjectId(_id)) = document.get("_id") {
if let Some(mongodb::bson::Bson::Int64(length)) = document.get("length") {
Some((_id.to_string(), *length as u64))
} else {
None
}
} else {
None
}
}
None => None,
};
Ok(result)
}
pub async fn get_file_with_opts(
&self,
_database: &str,
sort: mongodb::bson::Document,
filter: mongodb::bson::Document,
skip: usize,
) -> Result<(Option<String>, impl Stream<Item = Vec<u8>>)> {
let find_options = FindOneOptions::builder()
.projection(Some(doc! { "_id": 1 }))
.sort(sort)
.skip(skip as u64)
.build();
let result = self
.client
.database(_database)
.collection::<mongodb::bson::Document>("fs.files")
.find_one(filter)
.with_options(find_options)
.await
.map_err(|e| {
log::error!("get_file_with_opts: error={:?}", e);
anyhow::anyhow!(e)
})?;
let file_id = match result {
Some(document) => {
if let Some(mongodb::bson::Bson::ObjectId(id)) = document.get("_id") {
Some(*id)
} else {
None
}
}
None => None,
};
if file_id.is_none() {
return Err(Error::UnexpectedError(anyhow::anyhow!("file not exists")));
}
let file_id = file_id.unwrap();
let bucket = GridFS::new(
(self.client).database(_database).clone(),
Some(GridFSBucketOptions::default()),
);
let cursor = bucket
.open_download_stream(file_id.to_owned())
.await
.map_err(|e| {
log::error!("get_file_with_opts: error={:?}", e);
anyhow::anyhow!(e)
})?;
let find_options = FindOneOptions::builder()
.projection(Some(doc! { "mime_type": 1 }))
.build();
let result = self
.client
.database(_database)
.collection::<mongodb::bson::Document>("fs.files")
.find_one(doc! { "_id": file_id })
.with_options(find_options)
.await
.map_err(|e| {
log::error!("get_file_with_opts: error={:?}", e);
anyhow::anyhow!(e)
})?;
let mime_type = match result {
Some(document) => {
if let Some(mongodb::bson::Bson::String(mime_type)) = document.get("mime_type") {
Some(mime_type.to_string())
} else {
None
}
}
None => None,
};
Ok((mime_type, cursor))
}
pub async fn insert_one<T>(
&self,
database: &str,
collection: &str,
model: impl Borrow<T>,
) -> Result<String>
where
T: serde::Serialize + Send + Sync,
{
let result = self
.client
.database(database)
.collection(collection)
.insert_one(model)
.await
.map_err(|e| {
log::error!("insert_one: error={:?}", e);
anyhow::anyhow!(e)
})?;
if let Some(s) = result.inserted_id.as_object_id() {
Ok(s.to_string())
} else {
Err(Error::UnexpectedError(anyhow::anyhow!("save doc failed!")))
}
}
pub async fn find_one<T>(&self, database: &str, collection: &str, id: &str) -> Result<Option<T>>
where
T: serde::de::DeserializeOwned + Unpin + Send + Sync,
{
let obj_id = ObjectId::parse_str(id)?;
let filter = doc! {"_id": obj_id};
let result = self
.client
.database(database)
.collection(collection)
.find_one(filter)
.await
.map_err(|e| {
log::error!("find_one: error={:?}", e);
anyhow::anyhow!(e)
})?;
Ok(result)
}
pub async fn exists_by_sub_match(
&self,
database: &str,
collection: &str,
filter: mongodb::bson::Document,
options: mongodb::bson::Document,
) -> Result<bool> {
let options = FindOneOptions::builder().projection(options).build();
if let Ok(result) = self
.client
.database(database)
.collection::<mongodb::bson::Document>(collection)
.find_one(filter)
.with_options(options)
.await
{
if result.is_some() {
return Ok(true);
}
}
Ok(false)
}
pub async fn delete_one<T>(&self, database: &str, collection: &str, id: &str) -> Result<bool>
where
T: serde::Serialize + Send + Sync,
{
let obj_id = ObjectId::parse_str(id)?;
let filter = doc! {"_id": obj_id};
let result = self
.client
.database(database)
.collection::<T>(collection)
.delete_one(filter)
.await
.map_err(|e| {
log::error!("delete_one: error={:?}", e);
anyhow::anyhow!(e)
})?;
Ok(result.deleted_count > 0)
}
pub async fn find<T>(
&self,
database: &str,
collection: &str,
filter: mongodb::bson::Document,
) -> Result<Vec<T>>
where
T: serde::de::DeserializeOwned + Send,
{
let mut cursor = self
.client
.database(database)
.collection::<mongodb::bson::Document>(collection)
.find(filter)
.await
.map_err(|e| {
log::error!("find: error={:?}", e);
anyhow::anyhow!(e)
})?;
let mut docs: Vec<T> = Vec::new();
while let Some(doc) = cursor
.try_next()
.await
.expect("Error mapping through cursor")
{
let doc: T = bson::deserialize_from_bson(bson::Bson::Document(doc)).map_err(|e| {
log::error!("find-from_bson: error={:?}", e);
anyhow::anyhow!(e)
})?;
docs.push(doc)
}
Ok(docs)
}
pub async fn select<T>(
&self,
database: &str,
collection: &str,
filter: mongodb::bson::Document,
sort: mongodb::bson::Document,
range: (u32, u32), ) -> Result<Vec<T>>
where
T: serde::de::DeserializeOwned + Send,
{
if range.1 == 0 {
return Ok(Vec::new());
}
let find_options = FindOptions::builder()
.sort(sort)
.limit(Some(range.1 as i64)) .skip(Some(range.0 as u64)) .build();
let mut cursor = self
.client
.database(database)
.collection::<mongodb::bson::Document>(collection)
.find(filter)
.with_options(find_options)
.await
.map_err(|e| {
log::error!("select: error={:?}", e);
anyhow::anyhow!(e)
})?;
let mut docs: Vec<T> = Vec::new();
while let Some(doc) = cursor.try_next().await.map_err(|e| {
log::error!("select-try_next: error={:?}", e);
anyhow::anyhow!(e)
})? {
let doc: T = bson::deserialize_from_bson(bson::Bson::Document(doc)).map_err(|e| {
log::error!("select-from_bson: error={:?}", e);
anyhow::anyhow!(e)
})?;
docs.push(doc)
}
Ok(docs)
}
pub async fn distinct<T>(
&self,
database: &str,
collection: &str,
field_name: &str,
filter: mongodb::bson::Document,
) -> Result<Vec<T>>
where
T: serde::de::DeserializeOwned + Send,
{
let distinct_options = DistinctOptions::builder().build();
let result = self
.client
.database(database)
.collection::<mongodb::bson::Document>(collection)
.distinct(field_name, filter)
.with_options(distinct_options)
.await
.map_err(|e| {
log::error!("select: error={:?}", e);
anyhow::anyhow!(e)
})?;
let mut docs: Vec<T> = Vec::new();
for item in result {
let doc: T = bson::deserialize_from_bson(item).map_err(|e| {
log::error!("distinct-from_bson: error={:?}", e);
anyhow::anyhow!(e)
})?;
docs.push(doc);
}
Ok(docs)
}
pub async fn update_one(
&self,
database: &str,
collection: &str,
id: &str,
update: mongodb::bson::Document,
) -> Result<bool> {
let filter = doc! {"_id": ObjectId::parse_str(id)?};
let options = UpdateOptions::builder().upsert(true).build();
let result = self
.client
.database(database)
.collection::<mongodb::bson::Document>(collection)
.update_one(filter, update)
.with_options(Some(options))
.await
.map_err(|e| {
log::error!("update_one: error={:?}", e);
anyhow::anyhow!(e)
})?;
Ok(result.modified_count > 0)
}
pub async fn update_set(
&self,
database: &str,
collection: &str,
id: &str,
json_object: serde_json::Value,
) -> Result<bool> {
let filter = doc! {"_id": ObjectId::parse_str(id)?};
let options = UpdateOptions::builder().upsert(true).build();
let _bson_value = bson::serialize_to_bson(&json_object)?;
let mut update_set = Document::new();
update_set.insert("$set", _bson_value);
let result = self
.client
.database(database)
.collection::<mongodb::bson::Document>(collection)
.update_one(filter, update_set)
.with_options(Some(options))
.await
.map_err(|e| {
log::error!("update_set: error={:?}", e);
anyhow::anyhow!(e)
})?;
Ok(result.modified_count > 0)
}
pub async fn pipeline(
&self,
database: &str,
collection: &str,
pipeline: Vec<Document>,
) -> Result<Vec<(String, i32)>> {
let _opts = AggregateOptions::builder()
.read_concern(ReadConcern::available())
.allow_disk_use(true)
.write_concern(WriteConcern::builder().w(None).build())
.build();
let mut cursor = self
.client
.database(database)
.collection::<mongodb::bson::Document>(collection)
.aggregate(pipeline)
.with_options(Some(_opts))
.await
.map_err(|e| {
log::error!("pipeline: error={:?}", e);
anyhow::anyhow!(e)
})?;
let mut docs: Vec<(String, i32)> = Vec::new();
while let Some(document) = cursor.try_next().await.map_err(|e| {
log::error!("pipeline-try_next: error={:?}", e);
anyhow::anyhow!(e)
})? {
match document.get_i32("_cnt") {
Ok(count) => match document.get_object_id("_id") {
Ok(id) => {
docs.push((id.to_string(), count));
}
Err(_) => match document.get_str("_id") {
Ok(id) => {
docs.push((id.to_string(), count));
}
Err(e2) => {
log::error!("pipeline: error={:?}", e2);
}
},
},
Err(e) => {
log::error!("pipeline: error={:?}", e);
docs.push((
document
.get_object_id("_id")
.map_err(|e| Error::UnexpectedError(anyhow::anyhow!(e)))?
.to_string(),
0,
));
}
}
}
Ok(docs)
}
}