use mongodb::error::Error;
use mongodb::options::{
FindOneOptions, FindOptions, InsertOneOptions, ListCollectionsOptions, SelectionCriteria,
UpdateOptions,
};
use mongodb::{Collection, Database};
use md5::{Digest, Md5};
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use typed_builder::TypedBuilder;
use bson::{doc, oid::ObjectId, DateTime, Document};
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio_stream::{Stream, StreamExt};
use mongodb::options::{ReadConcern, ReadPreference, WriteConcern};
#[derive(Debug)]
pub enum GridFSError {
MongoError(mongodb::error::Error),
FileNotFound(),
}
impl From<mongodb::error::Error> for GridFSError {
fn from(err: mongodb::error::Error) -> GridFSError {
GridFSError::MongoError(err)
}
}
impl std::error::Error for GridFSError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
GridFSError::MongoError(e) => Some(e),
GridFSError::FileNotFound() => None,
}
}
fn description(&self) -> &str {
"description() is deprecated; use Display"
}
fn cause(&self) -> Option<&dyn std::error::Error> {
self.source()
}
}
impl Display for GridFSError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
GridFSError::MongoError(me) => write!(f, "{}", me),
GridFSError::FileNotFound() => write!(f, "File not found"),
}
}
}
pub trait ProgressUpdate {
fn update(&self, position: usize);
}
#[derive(Clone, Default, TypedBuilder)]
pub struct GridFSUploadOptions {
#[builder(default = None)]
pub(crate) chunk_size_bytes: Option<u32>,
#[builder(default = None)]
pub(crate) metadata: Option<Document>,
#[allow(dead_code)]
#[builder(default = None)]
content_type: Option<String>,
#[allow(dead_code)]
#[builder(default = None)]
aliases: Option<Vec<String>>,
#[builder(default = None)]
pub(crate) progress_tick: Option<Arc<dyn ProgressUpdate + Send + Sync>>,
}
#[derive(Clone, Debug, TypedBuilder)]
pub struct GridFSBucketOptions {
#[builder(default = "fs".into())]
pub bucket_name: String,
#[builder(default = 255 * 1024)]
pub chunk_size_bytes: u32,
#[builder(default)]
pub write_concern: Option<WriteConcern>,
#[builder(default)]
pub read_concern: Option<ReadConcern>,
#[builder(default)]
pub read_preference: Option<ReadPreference>,
#[builder(default = false)]
pub disable_md5: bool,
}
impl Default for GridFSBucketOptions {
fn default() -> Self {
GridFSBucketOptions {
bucket_name: "fs".into(),
chunk_size_bytes: 255 * 1024,
write_concern: None,
read_concern: None,
read_preference: None,
disable_md5: false,
}
}
}
#[derive(Clone, Debug)]
pub struct GridFS {
pub(crate) db: Database,
pub(crate) options: Option<GridFSBucketOptions>,
pub(crate) never_write: bool,
}
impl GridFS {
pub fn new(db: Database, options: Option<GridFSBucketOptions>) -> GridFS {
GridFS {
db,
options,
never_write: true,
}
}
#[rustfmt::skip]
async fn create_files_index(&self, collection_name: &str) -> Result<Document, Error> {
self.db
.run_command(
doc! {
"createIndexes": collection_name,
"indexes": [
{
"key": {
"filename":1,
"uploadDate":1.0
},
"name": collection_name.to_owned()+"_index",
}]},
)
.await
}
async fn create_chunks_index(&self, collection_name: &str) -> Result<Document, Error> {
self.db
.run_command(doc! {
"createIndexes": collection_name,
"indexes": [
{
"key": {
"files_id":1,
"n":1
},
"name": collection_name.to_owned()+"_index",
}]})
.await
}
#[rustfmt::skip]
async fn ensure_file_index(
&mut self,
files: &Collection<Document>,
file_collection: &str,
chunk_collection: &str,
) -> Result<(), Error> {
if self.never_write {
if files
.find_one(
doc! {},
).with_options(
FindOneOptions::builder()
.projection(doc! { "_id": 1 })
.build())
.await
.ok()
== Some(None)
{
{
let options = ListCollectionsOptions::builder()
.filter(doc! { "name": &file_collection })
.build();
let is_collection_exists = self
.db
.list_collection_names()
.with_options(Some(options))
.await?;
if is_collection_exists.is_empty() {
self.db
.create_collection(file_collection.to_string())
.await?
}
let indexes = self
.db
.run_command(doc! {"listIndexes":file_collection})
.await?;
let mut have_index = false;
for index in indexes
.get_document("cursor")
.unwrap()
.get_array("firstBatch")
.unwrap()
{
let key = index.as_document().unwrap().get_document("key").unwrap();
let filename = key.get_i32("filename");
let upload_date = key.get_i32("uploadDate");
let filename_f = key.get_f64("filename");
let upload_date_f = key.get_f64("uploadDate");
match (filename, upload_date, filename_f, upload_date_f) {
(Ok(1), Ok(1), _, _) => {
have_index = true;
}
(_, _, Ok(x), Ok(y))
if (x - 1.0).abs() < 0.0001 && (y - 1.0).abs() < 0.0001 =>
{
have_index = true;
}
(Ok(1), _, _, Ok(x)) if (x - 1.0).abs() < 0.0001 => {
have_index = true;
}
(_, Ok(1), Ok(x), _) if (x - 1.0).abs() < 0.0001 => {
have_index = true;
}
_ => {}
}
}
if !have_index {
self.create_files_index(file_collection).await?;
}
}
{
let options = ListCollectionsOptions::builder()
.filter(doc! { "name": &chunk_collection })
.build();
let is_collection_exists = self
.db
.list_collection_names()
.with_options(Some(options))
.await?;
if is_collection_exists.is_empty() {
self.db
.create_collection(chunk_collection.to_string())
.await?
}
let indexes = self
.db
.run_command(doc! {"listIndexes":chunk_collection})
.await?;
let mut have_index = false;
for index in indexes
.get_document("cursor")
.unwrap()
.get_array("firstBatch")
.unwrap()
{
let key = index.as_document().unwrap().get_document("key").unwrap();
let files_id = key.get_i32("files_id");
let n = key.get_i32("n");
let files_id_f = key.get_f64("files_id");
let n_f = key.get_f64("n");
match (files_id, n, files_id_f, n_f) {
(Ok(1), Ok(1), _, _) => {
have_index = true;
}
(_, _, Ok(x), Ok(y))
if (x - 1.0).abs() < 0.0001 && (y - 1.0).abs() < 0.0001 =>
{
have_index = true;
}
(Ok(1), _, _, Ok(x)) if (x - 1.0).abs() < 0.0001 => {
have_index = true;
}
(_, Ok(1), Ok(x), _) if (x - 1.0).abs() < 0.0001 => {
have_index = true;
}
_ => {}
}
}
if !have_index {
self.create_chunks_index(chunk_collection).await?;
}
}
}
self.never_write = false;
}
Ok(())
}
#[rustfmt::skip]
pub async fn upload_from_stream(
&mut self,
filename: &str,
mut source: impl AsyncRead + Unpin,
options: Option<GridFSUploadOptions>,
) -> Result<ObjectId, Error> {
let dboptions = self.options.clone().unwrap_or_default();
let mut chunk_size: u32 = dboptions.chunk_size_bytes;
let bucket_name = dboptions.bucket_name;
let file_collection = bucket_name.clone() + ".files";
let disable_md5 = dboptions.disable_md5;
let chunk_collection = bucket_name + ".chunks";
let mut progress_tick = None;
if let Some(options) = options.clone() {
if let Some(chunk_size_bytes) = options.chunk_size_bytes {
chunk_size = chunk_size_bytes;
}
progress_tick = options.progress_tick;
}
let files = self.db.collection(&file_collection);
self.ensure_file_index(&files, &file_collection, &chunk_collection).await?;
let mut file_document = doc! {"filename":filename, "chunkSize":chunk_size};
if let Some(options) = options {
if let Some(metadata) = options.metadata {
file_document.insert("metadata", metadata);
}
}
let mut insert_option = InsertOneOptions::builder().build();
if let Some(write_concern) = dboptions.write_concern.clone() {
insert_option.write_concern = Some(write_concern);
}
let insert_file_result = files
.insert_one(file_document)
.with_options(Some(insert_option.clone()))
.await?;
let files_id = insert_file_result.inserted_id.as_object_id().unwrap();
let mut md5 = Md5::default();
let chunks = self.db.collection(&chunk_collection);
let mut vecbuf: Vec<u8> = vec![0; chunk_size as usize];
let mut length: usize = 0;
let mut n: u32 = 0;
loop {
let chunk_read_size = {
let mut chunk_read_size = 0;
loop {
let buffer = &mut vecbuf[chunk_read_size..];
let step_read_size = source.read(buffer).await?;
if step_read_size == 0 {
break;
}
chunk_read_size += step_read_size;
}
if chunk_read_size == 0 {
break;
}
chunk_read_size
};
let bin: Vec<u8> = Vec::from(&vecbuf[..chunk_read_size]);
md5.update(&bin);
chunks
.insert_one(
doc! {
"files_id": files_id,
"n": n,
"data": bson::Binary{subtype: bson::spec::BinarySubtype::Generic, bytes:bin}
},
)
.with_options(Some(insert_option.clone()))
.await?;
length += chunk_read_size;
n += 1;
if let Some(ref progress_tick) = progress_tick {
progress_tick.update(length);
};
}
let mut update = doc! { "length": length as i64, "uploadDate": DateTime::now() };
if !disable_md5 {
update.insert("md5", format!("{:02x}", md5.finalize()));
}
let mut update_option = UpdateOptions::builder().build();
if let Some(write_concern) = dboptions.write_concern {
update_option.write_concern = Some(write_concern);
}
files
.update_one(
doc! {"_id":files_id},
doc! {"$set":update},
)
.with_options(Some(update_option))
.await?;
Ok(files_id)
}
#[rustfmt::skip]
pub async fn open_download_stream(
&self,
id: ObjectId,
) -> Result<impl Stream<Item = Vec<u8>>, GridFSError> {
let (stream, _) = self.open_download_stream_with_filename(id).await?;
Ok(stream)
}
pub async fn open_download_stream_with_filename(
&self,
id: ObjectId,
) -> Result<(impl Stream<Item = Vec<u8>>, String), GridFSError> {
let dboptions = self.options.clone().unwrap_or_default();
let bucket_name = dboptions.bucket_name;
let file_collection = bucket_name.clone() + ".files";
let files = self.db.collection::<Document>(&file_collection);
let chunk_collection = bucket_name + ".chunks";
let chunks = self.db.collection::<Document>(&chunk_collection);
let mut find_one_options = FindOneOptions::builder().build();
let mut find_options = FindOptions::builder().sort(doc! {"n":1}).build();
if let Some(read_concern) = dboptions.read_concern {
find_one_options.read_concern = Some(read_concern.clone());
find_options.read_concern = Some(read_concern);
}
if let Some(read_preference) = dboptions.read_preference {
find_one_options.selection_criteria =
Some(SelectionCriteria::ReadPreference(read_preference.clone()));
find_options.selection_criteria =
Some(SelectionCriteria::ReadPreference(read_preference));
}
let file = files
.find_one(doc! {"_id":id})
.with_options(find_one_options)
.await?;
if let Some(file) = file {
let filename = file.get_str("filename").unwrap().to_string();
let stream = chunks
.find(doc! {"files_id":id})
.with_options(Some(find_options))
.await
.unwrap()
.map(|item| {
let i = item.unwrap();
i.get_binary_generic("data").unwrap().clone()
});
Ok((stream, filename))
} else {
Err(GridFSError::FileNotFound())
}
}
}