use crate::StorageConfig;
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use futures_util::{AsyncWriteExt, StreamExt};
use mongodb::{
bson::{doc, raw::ValueAccessErrorKind, Bson, Document, RawDocument},
gridfs::GridFsBucket,
options::GridFsUploadOptions,
Client, Database,
};
use remi::{Blob, File, ListBlobsRequest, UploadRequest};
use std::{borrow::Cow, collections::HashMap, io, path::Path};
use tokio_util::{compat::FuturesAsyncReadCompatExt, io::ReaderStream};
fn value_access_err_to_error(error: mongodb::bson::raw::ValueAccessError) -> mongodb::error::Error {
match error.kind {
ValueAccessErrorKind::NotPresent => {
mongodb::error::Error::custom(format!("key [{}] was not found", error.key()))
}
ValueAccessErrorKind::UnexpectedType { expected, actual, .. } => mongodb::error::Error::custom(format!(
"expected BSON type '{expected:?}', actual type for key [{}] is '{actual:?}'",
error.key()
)),
ValueAccessErrorKind::InvalidBson(err) => err.into(),
_ => unimplemented!(
"`ValueAccessErrorKind` was unhandled, please report it: https://github.com/Noelware/remi-rs/issues/new"
),
}
}
fn document_to_blob(bytes: Bytes, doc: &RawDocument) -> Result<File, mongodb::error::Error> {
let filename = doc.get_str("filename").map_err(value_access_err_to_error)?;
let length = doc.get_i64("length").map_err(value_access_err_to_error)?;
let created_at = doc.get_datetime("uploadDate").map_err(value_access_err_to_error)?;
let metadata = doc.get_document("metadata").map_err(value_access_err_to_error)?;
let content_type = match metadata.get_str("contentType") {
Ok(res) => Some(res),
Err(e) => match e.kind {
ValueAccessErrorKind::NotPresent => match metadata.get_str("contentType") {
Ok(res) => Some(res),
Err(e) => return Err(value_access_err_to_error(e)),
},
_ => return Err(value_access_err_to_error(e)),
},
};
let mut map = HashMap::new();
for ref_ in metadata.into_iter() {
let (name, doc) = ref_?;
if name != "contentType" {
if let Some(s) = doc.as_str() {
map.insert(name.into(), s.into());
}
}
}
Ok(File {
last_modified_at: None,
content_type: content_type.map(String::from),
metadata: map,
created_at: if created_at.timestamp_millis() < 0 {
#[cfg(feature = "tracing")]
::tracing::warn!(%filename, "`created_at` timestamp was negative");
#[cfg(feature = "log")]
::log::warn!("`created_at` for file {filename} was negative");
None
} else {
Some(
u128::try_from(created_at.timestamp_millis())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
)
},
is_symlink: false,
data: bytes,
name: filename.to_owned(),
path: format!("gridfs://{filename}"),
size: if length < 0 {
0
} else {
length
.try_into()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
},
})
}
fn resolve_path(path: &Path) -> Result<String, mongodb::error::Error> {
let path = path.to_str().ok_or_else(|| {
<mongodb::error::Error as From<io::Error>>::from(io::Error::new(
io::ErrorKind::InvalidData,
"expected valid utf-8 string",
))
})?;
let path = path.trim_start_matches("~/").trim_start_matches("./");
Ok(path.to_owned())
}
#[derive(Debug, Clone)]
pub struct StorageService {
config: Option<StorageConfig>,
bucket: GridFsBucket,
}
impl StorageService {
pub fn new(db: Database, config: StorageConfig) -> StorageService {
let bucket = db.gridfs_bucket(Some(config.clone().into()));
StorageService {
config: Some(config),
bucket,
}
}
pub fn from_client(client: &Client, config: StorageConfig) -> StorageService {
Self::new(
client.database(&config.clone().database.unwrap_or(String::from("mydb"))),
config,
)
}
pub async fn from_conn_string<C: AsRef<str>>(
conn_string: C,
config: StorageConfig,
) -> Result<StorageService, mongodb::error::Error> {
let client = Client::with_uri_str(conn_string).await?;
Ok(Self::from_client(&client, config))
}
pub fn with_bucket(bucket: GridFsBucket) -> StorageService {
StorageService { config: None, bucket }
}
fn resolve_path<P: AsRef<Path>>(&self, path: P) -> Result<String, mongodb::error::Error> {
resolve_path(path.as_ref())
}
}
#[async_trait]
impl remi::StorageService for StorageService {
type Error = mongodb::error::Error;
fn name(&self) -> Cow<'static, str> {
Cow::Borrowed("remi:gridfs")
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "remi.gridfs.open",
skip_all,
fields(
remi.service = "gridfs",
path = %path.as_ref().display()
)
)
)]
async fn open<P: AsRef<Path> + Send>(&self, path: P) -> Result<Option<Bytes>, Self::Error> {
let path = self.resolve_path(path)?;
#[cfg(feature = "tracing")]
::tracing::info!(file = %path, "opening file");
#[cfg(feature = "log")]
::log::info!("opening file [{}]", path);
let mut cursor = self.bucket.find(doc! { "filename": &path }).await?;
let advanced = cursor.advance().await?;
if !advanced {
#[cfg(feature = "tracing")]
::tracing::warn!(
file = %path,
"file doesn't exist in GridFS"
);
#[cfg(feature = "log")]
::log::warn!("file [{}] doesn't exist in GridFS", path);
return Ok(None);
}
let doc = cursor.current();
let stream = self
.bucket
.open_download_stream(Bson::ObjectId(
doc.get_object_id("_id").map_err(value_access_err_to_error)?,
))
.await?;
let mut bytes = BytesMut::new();
let mut reader = ReaderStream::new(stream.compat());
while let Some(raw) = reader.next().await {
match raw {
Ok(b) => bytes.extend(b),
Err(e) => return Err(e.into()),
}
}
Ok(Some(bytes.into()))
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "remi.gridfs.blob",
skip_all,
fields(
remi.service = "gridfs",
path = %path.as_ref().display()
)
)
)]
async fn blob<P: AsRef<Path> + Send>(&self, path: P) -> Result<Option<Blob>, Self::Error> {
let path = self.resolve_path(path)?;
let Some(bytes) = self.open(&path).await? else {
return Ok(None);
};
#[cfg(feature = "tracing")]
::tracing::info!(
file = %path,
"getting file metadata for file"
);
#[cfg(feature = "log")]
::log::info!("getting file metadata for file [{}]", path);
let mut cursor = self
.bucket
.find(doc! {
"filename": &path,
})
.await?;
let has_advanced = cursor.advance().await?;
if !has_advanced {
#[cfg(feature = "tracing")]
::tracing::warn!(file = %path, "file doesn't exist");
#[cfg(feature = "log")]
::log::warn!("file [{}] doesn't exist", path);
return Ok(None);
}
let doc = cursor.current();
document_to_blob(bytes, doc).map(|doc| Some(Blob::File(doc)))
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "remi.gridfs.blobs",
skip_all,
fields(
remi.service = "gridfs"
)
)
)]
async fn blobs<P: AsRef<Path> + Send>(
&self,
path: Option<P>,
_request: Option<ListBlobsRequest>,
) -> Result<Vec<Blob>, Self::Error> {
#[allow(unused)]
if let Some(path) = path {
#[cfg(feature = "tracing")]
::tracing::warn!(
file = %path.as_ref().display(),
"using blobs() with a given file name is not supported",
);
#[cfg(feature = "log")]
::log::warn!(
"using blobs() with a given file name [{}] is not supported",
path.as_ref().display()
);
return Ok(vec![]);
}
let mut cursor = self.bucket.find(doc!()).await?;
let mut blobs = vec![];
while cursor.advance().await? {
let doc = cursor.current();
let stream = self
.bucket
.open_download_stream(Bson::ObjectId(
doc.get_object_id("_id").map_err(value_access_err_to_error)?,
))
.await?;
let mut bytes = BytesMut::new();
let mut reader = ReaderStream::new(stream.compat());
while let Some(raw) = reader.next().await {
match raw {
Ok(b) => bytes.extend(b),
Err(e) => return Err(e.into()),
}
}
match document_to_blob(bytes.into(), doc) {
Ok(blob) => blobs.push(Blob::File(blob)),
#[cfg(any(feature = "tracing", feature = "log"))]
Err(e) => {
#[cfg(feature = "tracing")]
::tracing::error!(error = %e, "unable to convert to a file");
#[cfg(feature = "log")]
::log::error!("unable to convert to a file: {e}");
}
#[cfg(not(any(feature = "tracing", feature = "log")))]
Err(_e) => {}
}
}
Ok(blobs)
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "remi.gridfs.delete",
skip_all,
fields(
remi.service = "gridfs",
path = %path.as_ref().display()
)
)
)]
async fn delete<P: AsRef<Path> + Send>(&self, path: P) -> Result<(), Self::Error> {
let path = self.resolve_path(path)?;
#[cfg(feature = "tracing")]
::tracing::info!(file = %path, "deleting file");
#[cfg(feature = "log")]
::log::info!("deleting file [{}]", path);
let mut cursor = self
.bucket
.find(doc! {
"filename": &path,
})
.await?;
let has_advanced = cursor.advance().await?;
if !has_advanced {
#[cfg(feature = "tracing")]
::tracing::warn!(file = %path, "file doesn't exist");
#[cfg(feature = "log")]
::log::warn!("file [{}] doesn't exist", path);
return Ok(());
}
let doc = cursor.current();
let oid = doc.get_object_id("_id").map_err(value_access_err_to_error)?;
self.bucket.delete(Bson::ObjectId(oid)).await
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "remi.gridfs.exists",
skip_all,
fields(
remi.service = "gridfs",
path = %path.as_ref().display()
)
)
)]
async fn exists<P: AsRef<Path> + Send>(&self, path: P) -> Result<bool, Self::Error> {
match self.open(path).await {
Ok(Some(_)) => Ok(true),
Ok(None) => Ok(false),
Err(e) => Err(e),
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "remi.gridfs.blob",
skip_all,
fields(
remi.service = "gridfs",
path = %path.as_ref().display()
)
)
)]
async fn upload<P: AsRef<Path> + Send>(&self, path: P, options: UploadRequest) -> Result<(), Self::Error> {
let path = self.resolve_path(path)?;
#[cfg(feature = "tracing")]
::tracing::info!(
file = %path,
"uploading file to GridFS..."
);
#[cfg(feature = "log")]
::log::info!("uploading file [{}] to GridFS", path);
let mut metadata = options
.metadata
.into_iter()
.map(|(key, value)| (key, Bson::String(value)))
.collect::<Document>();
if let Some(ct) = options.content_type {
metadata.insert("contentType", ct);
}
let opts = GridFsUploadOptions::builder()
.chunk_size_bytes(Some(
self.config.clone().unwrap_or_default().chunk_size.unwrap_or(255 * 1024),
))
.metadata(metadata)
.build();
let mut stream = self.bucket.open_upload_stream(path).with_options(opts).await?;
stream.write_all(&options.data[..]).await?;
stream.close().await.map_err(From::from)
}
}