use std::{
io::{self, Cursor, ErrorKind},
path::Path,
str::FromStr,
time::Duration,
};
use short_crypt::ShortCrypt;
use crate::{
bson::{
document::{Document, ValueAccessError},
oid::ObjectId,
spec::BinarySubtype,
Binary, Bson, DateTime,
},
functions::*,
mime::Mime,
mongodb::{
options::{
ClientOptions, FindOneAndUpdateOptions, FindOneOptions, FindOptions, IndexOptions,
ReturnDocument, UpdateOptions,
},
results::DeleteResult,
Client, Collection, Database, IndexModel,
},
tokio::{
fs::File,
io::{AsyncRead, AsyncReadExt},
},
tokio_stream::{Stream, StreamExt},
Digest, FileCenterError, FileData, FileItem, Hasher, IDToken, DEFAULT_MIME_TYPE,
};
pub const DEFAULT_DATABASE_NAME: &str = "test";
pub const COLLECTION_FILES_NAME: &str = "file_center";
pub const COLLECTION_FILES_CHUNKS_NAME: &str = "file_center_chunks";
pub const COLLECTION_SETTINGS_NAME: &str = "file_center_settings";
pub const SETTING_FILE_SIZE_THRESHOLD: &str = "file_size_threshold";
pub const SETTING_CREATE_TIME: &str = "create_time";
pub const SETTING_VERSION: &str = "version";
#[doc(hidden)]
pub const MAX_FILE_SIZE_THRESHOLD: u32 = 16_770_000;
#[doc(hidden)]
pub const DEFAULT_FILE_SIZE_THRESHOLD: u32 = 262_144;
const TEMPORARY_LIFE_TIME: i64 = 60000;
const TEMPORARY_CHUNK_LIFE_TIME: i64 = 3600000;
const VERSION: i32 = 2; #[inline]
fn file_item_projection() -> Document {
doc! {
"_id": 1,
"create_time": 1,
"mime_type": 1,
"file_size": 1,
"file_name": 1,
"file_data": 1,
"chunk_id": 1,
"expire_at": 1,
}
}
#[inline]
fn file_exist_projection() -> Document {
doc! {
"_id": 1,
}
}
#[inline]
fn file_item_delete_projection() -> Document {
doc! {
"_id": 0,
"count": 1,
"chunk_id": 1,
"file_size": 1,
}
}
#[inline]
fn chunk_document(file_id: ObjectId, n: i64, bytes: Vec<u8>) -> Document {
doc! {
"file_id": file_id,
"n": n,
"data": bson::Binary{ subtype: bson::spec::BinarySubtype::Generic, bytes }
}
}
#[derive(Debug)]
struct FileCenterCollections {
files: Collection<Document>,
files_chunks: Collection<Document>,
settings: Collection<Document>,
}
#[derive(Debug)]
pub struct FileCenter {
db: Database,
collections: FileCenterCollections,
file_size_threshold: u32,
_create_time: DateTime,
_version: i32,
short_crypt: ShortCrypt,
}
impl FileCenter {
async fn create_indexes(&self) -> Result<(), FileCenterError> {
{
let create_time_index = {
let mut index = IndexModel::default();
index.keys = doc! {
"create_time": 1
};
index
};
let expire_at_index = {
let mut options = IndexOptions::default();
options.expire_after = Some(Duration::from_secs(0));
let mut index = IndexModel::default();
index.keys = doc! {
"expire_at": 1
};
index.options = Some(options);
index
};
let count_index = {
let mut index = IndexModel::default();
index.keys = doc! {
"count": 1
};
index
};
let hash_index = {
let mut options = IndexOptions::default();
options.unique = Some(true);
options.sparse = Some(true);
let mut index = IndexModel::default();
index.keys = doc! {
"hash_1": 1,
"hash_2": 1,
"hash_3": 1,
"hash_4": 1
};
index.options = Some(options);
index
};
let chunk_id_index = {
let mut options = IndexOptions::default();
options.unique = Some(true);
options.sparse = Some(true);
let mut index = IndexModel::default();
index.keys = doc! {
"chunk_id": 1,
};
index.options = Some(options);
index
};
self.collections
.files
.create_indexes(
[create_time_index, expire_at_index, count_index, hash_index, chunk_id_index],
None,
)
.await?;
}
{
let file_id_index = {
let mut index = IndexModel::default();
index.keys = doc! {
"file_id": 1,
};
index
};
let expire_at_index = {
let mut options = IndexOptions::default();
options.expire_after = Some(Duration::from_secs(0));
let mut index = IndexModel::default();
index.keys = doc! {
"expire_at": 1
};
index.options = Some(options);
index
};
self.collections
.files_chunks
.create_indexes([file_id_index, expire_at_index], None)
.await?;
}
Ok(())
}
async fn new_with_file_size_threshold_inner<U: AsRef<str>>(
uri: U,
initial_file_size_threshold: u32,
) -> Result<FileCenter, FileCenterError> {
let uri = uri.as_ref();
let client_options = ClientOptions::parse(uri).await?;
let client = Client::with_options(client_options)?;
let db_name = {
let uri = &uri[(uri.find("://").unwrap() + 3)..];
if let Some(index) = uri.rfind('/') {
let start = index + 1;
let end = uri[start..]
.rfind('?')
.unwrap_or_else(|| uri[start..].rfind('#').unwrap_or(uri.len()));
if start == end {
DEFAULT_DATABASE_NAME
} else {
&uri[start..end]
}
} else {
DEFAULT_DATABASE_NAME
}
};
let db = client.database(db_name);
let file_size_threshold;
let create_time;
let version;
let collection_settings = db.collection::<Document>(COLLECTION_SETTINGS_NAME);
let collection_files = db.collection::<Document>(COLLECTION_FILES_NAME);
let collection_files_chunks = db.collection::<Document>(COLLECTION_FILES_CHUNKS_NAME);
{
file_size_threshold = match collection_settings
.find_one(
Some(doc! {
"_id": SETTING_FILE_SIZE_THRESHOLD
}),
None,
)
.await?
{
Some(file_size_threshold) => {
let file_size_threshold = file_size_threshold.get_i32("value")?;
if file_size_threshold <= 0 {
return Err(FileCenterError::FileSizeThresholdError);
}
let file_size_threshold = file_size_threshold as u32;
if file_size_threshold > MAX_FILE_SIZE_THRESHOLD {
return Err(FileCenterError::FileSizeThresholdError);
}
file_size_threshold
},
None => {
collection_settings
.insert_one(
doc! {
"_id": SETTING_FILE_SIZE_THRESHOLD,
"value": initial_file_size_threshold
},
None,
)
.await?;
initial_file_size_threshold
},
};
create_time = match collection_settings
.find_one(
Some(doc! {
"_id": SETTING_CREATE_TIME
}),
None,
)
.await?
{
Some(create_time) => *create_time.get_datetime("value")?,
None => {
let now = DateTime::now();
collection_settings
.insert_one(
doc! {
"_id": SETTING_CREATE_TIME,
"value": now
},
None,
)
.await?;
now
},
};
version = match collection_settings
.find_one(
Some(doc! {
"_id": SETTING_VERSION
}),
None,
)
.await?
{
Some(version) => {
let version = version.get_i32("value")?;
if version <= 0 {
return Err(FileCenterError::VersionError);
}
if version > VERSION {
return Err(FileCenterError::DatabaseTooNewError {
supported_latest: VERSION,
current: version,
});
}
version
},
None => {
collection_settings
.insert_one(
doc! {
"_id": SETTING_VERSION,
"value": VERSION
},
None,
)
.await?;
VERSION
},
};
}
let short_crypt = ShortCrypt::new(format!("FileCenter-{}", create_time.timestamp_millis()));
let file_center = FileCenter {
db,
collections: FileCenterCollections {
files: collection_files,
files_chunks: collection_files_chunks,
settings: collection_settings,
},
file_size_threshold,
_create_time: create_time,
_version: version,
short_crypt,
};
file_center.create_indexes().await?;
Ok(file_center)
}
#[inline]
pub async fn new<U: AsRef<str>>(uri: U) -> Result<FileCenter, FileCenterError> {
Self::new_with_file_size_threshold_inner(uri, DEFAULT_FILE_SIZE_THRESHOLD).await
}
#[inline]
pub async fn new_with_file_size_threshold<U: AsRef<str>>(
uri: U,
initial_file_size_threshold: u32,
) -> Result<FileCenter, FileCenterError> {
if initial_file_size_threshold > MAX_FILE_SIZE_THRESHOLD || initial_file_size_threshold == 0
{
return Err(FileCenterError::FileSizeThresholdError);
}
Self::new_with_file_size_threshold_inner(uri, initial_file_size_threshold).await
}
}
impl FileCenter {
#[inline]
pub const fn get_file_size_threshold(&self) -> u32 {
self.file_size_threshold
}
pub async fn set_file_size_threshold(
&mut self,
file_size_threshold: u32,
) -> Result<(), FileCenterError> {
let collection_settings = &self.collections.settings;
if file_size_threshold > MAX_FILE_SIZE_THRESHOLD || file_size_threshold == 0 {
return Err(FileCenterError::FileSizeThresholdError);
}
if file_size_threshold != self.file_size_threshold {
let mut options = UpdateOptions::default();
options.upsert = Some(true);
collection_settings
.update_one(
doc! {
"_id": SETTING_FILE_SIZE_THRESHOLD
},
doc! {
"$set": {
"value": file_size_threshold
}
},
Some(options),
)
.await?;
self.file_size_threshold = file_size_threshold;
}
Ok(())
}
#[inline]
pub async fn drop_database(self) -> Result<(), FileCenterError> {
self.db.drop(None).await?;
Ok(())
}
#[inline]
pub async fn drop_file_center(self) -> Result<(), FileCenterError> {
self.collections.files.drop(None).await?;
self.collections.files_chunks.drop(None).await?;
self.collections.settings.drop(None).await?;
Ok(())
}
}
impl FileCenter {
async fn open_download_stream(
&self,
id: ObjectId,
) -> Result<impl Stream<Item = Result<Cursor<Vec<u8>>, io::Error>> + Unpin, FileCenterError>
{
let collection_files_chunks = &self.collections.files_chunks;
let mut find_options = FindOptions::default();
find_options.sort = Some(doc! {
"n": 1
});
Ok(collection_files_chunks
.find(
doc! {
"file_id": id
},
find_options,
)
.await
.unwrap()
.map(|item| {
item.map_err(|err| io::Error::new(ErrorKind::InvalidData, err)).and_then(|i| {
i.get_binary_generic("data")
.map(|v| Cursor::new(v.to_vec()))
.map_err(|err| io::Error::new(ErrorKind::InvalidData, err))
})
}))
}
async fn create_file_item(&self, mut document: Document) -> Result<FileItem, FileCenterError> {
let file_id = match document
.remove("_id")
.ok_or(FileCenterError::DocumentError(ValueAccessError::NotPresent))?
{
Bson::ObjectId(b) => b,
_ => {
return Err(FileCenterError::DocumentError(ValueAccessError::UnexpectedType));
},
};
let create_time = match document
.remove("create_time")
.ok_or(FileCenterError::DocumentError(ValueAccessError::NotPresent))?
{
Bson::DateTime(b) => b,
_ => {
return Err(FileCenterError::DocumentError(ValueAccessError::UnexpectedType));
},
};
let expire_at = match document.remove("expire_at") {
Some(expire_at) => match expire_at {
Bson::DateTime(b) => Some(b),
_ => {
return Err(FileCenterError::DocumentError(ValueAccessError::UnexpectedType));
},
},
None => None,
};
let mime_type = match document
.remove("mime_type")
.ok_or(FileCenterError::DocumentError(ValueAccessError::NotPresent))?
{
Bson::String(b) => Mime::from_str(&b)
.map_err(|_| FileCenterError::DocumentError(ValueAccessError::UnexpectedType))?,
_ => {
return Err(FileCenterError::DocumentError(ValueAccessError::UnexpectedType));
},
};
let file_size = document.get_i64("file_size")? as u64;
let file_name = match document
.remove("file_name")
.ok_or(FileCenterError::DocumentError(ValueAccessError::NotPresent))?
{
Bson::String(b) => b,
_ => {
return Err(FileCenterError::DocumentError(ValueAccessError::UnexpectedType));
},
};
let file_data = match document.remove("file_data") {
Some(file_data) => match file_data {
Bson::Binary(b) => FileData::Buffer(b.bytes),
_ => {
return Err(FileCenterError::DocumentError(ValueAccessError::UnexpectedType));
},
},
None => {
match document
.remove("chunk_id")
.ok_or(FileCenterError::DocumentError(ValueAccessError::NotPresent))?
{
Bson::ObjectId(_) => (),
_ => {
return Err(FileCenterError::DocumentError(
ValueAccessError::UnexpectedType,
));
},
};
let stream = self.open_download_stream(file_id).await?;
FileData::Stream(Box::new(stream))
},
};
Ok(FileItem {
file_id,
create_time,
expire_at,
mime_type,
file_size,
file_name,
file_data,
})
}
pub async fn check_file_item_exist(&self, id: ObjectId) -> Result<bool, FileCenterError> {
let mut options = FindOneOptions::default();
options.projection = Some(file_exist_projection());
let file_item = self
.collections
.files
.find_one(
Some(doc! {
"_id": id
}),
Some(options),
)
.await?;
Ok(file_item.is_some())
}
pub async fn get_file_item_by_id(
&self,
id: ObjectId,
) -> Result<Option<FileItem>, FileCenterError> {
let collection_files = &self.collections.files;
let mut options = FindOneOptions::default();
options.projection = Some(file_item_projection());
let file_item = collection_files
.find_one(
Some(doc! {
"_id": id
}),
Some(options),
)
.await?;
match file_item {
Some(file_item) => {
if let Some(expire_at) = file_item.get("expire_at") {
match expire_at.as_datetime() {
Some(expire_at) => {
if collection_files
.delete_one(
doc! {
"_id": id
},
None,
)
.await
.is_err()
{
}
if DateTime::now().gt(expire_at) {
return Ok(None);
}
},
None => {
return Err(FileCenterError::DocumentError(
ValueAccessError::UnexpectedType,
))
},
}
}
let file_item = self.create_file_item(file_item).await?;
Ok(Some(file_item))
},
None => Ok(None),
}
}
pub async fn delete_file_item_by_id(
&self,
file_id: ObjectId,
) -> Result<Option<u64>, FileCenterError> {
let collection_files = &self.collections.files;
let mut options = FindOneAndUpdateOptions::default();
options.return_document = Some(ReturnDocument::After);
options.projection = Some(file_item_delete_projection());
let result = collection_files
.find_one_and_update(
doc! {
"_id": file_id,
},
doc! {
"$inc": {
"count": -1
}
},
Some(options),
)
.await?;
match result {
Some(result) => {
let count = result.get_i32("count")?;
let file_size = result.get_i64("file_size")? as u64;
if count <= 0 {
collection_files
.delete_one(
doc! {
"_id": file_id
},
None,
)
.await?;
if result.get("chunk_id").is_some()
&& self.delete_file_chunks(file_id).await.is_err()
{}
}
Ok(Some(file_size))
},
None => Ok(None),
}
}
}
impl FileCenter {
#[inline]
async fn delete_file_chunks(&self, file_id: ObjectId) -> Result<DeleteResult, FileCenterError> {
Ok(self
.collections
.files_chunks
.delete_many(
doc! {
"file_id": file_id
},
None,
)
.await?)
}
}
impl FileCenter {
async fn upload_from_stream(
&self,
file_id: ObjectId,
mut source: impl AsyncRead + Unpin,
) -> Result<ObjectId, FileCenterError> {
let collection_files_chunks = &self.collections.files_chunks;
let buffer_size = self.file_size_threshold as usize;
let mut buffer: Vec<u8> = vec![0u8; buffer_size];
let mut n = 0i64;
let mut inserted_id = None;
loop {
let mut cc = 0;
loop {
let c = match source.read(&mut buffer[cc..]).await {
Ok(0) => break,
Ok(c) => c,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e.into()),
};
cc += c;
if cc == buffer_size {
break;
}
}
if cc == 0 {
break;
}
let chunk = &buffer[..cc];
let result = collection_files_chunks
.insert_one(chunk_document(file_id, n, chunk.to_vec()), None)
.await?;
inserted_id = Some(match result.inserted_id.as_object_id() {
Some(id) => id,
None => {
return Err(FileCenterError::DocumentError(ValueAccessError::UnexpectedType));
},
});
n += 1;
}
match inserted_id {
Some(inserted_id) => Ok(inserted_id),
None => {
let result = collection_files_chunks
.insert_one(chunk_document(file_id, 0, Vec::new()), None)
.await?;
match result.inserted_id.as_object_id() {
Some(id) => Ok(id),
None => Err(FileCenterError::DocumentError(ValueAccessError::UnexpectedType)),
}
},
}
}
pub async fn put_file_by_path<P: AsRef<Path>, S: Into<String>>(
&self,
file_path: P,
file_name: Option<S>,
mime_type: Option<Mime>,
) -> Result<ObjectId, FileCenterError> {
let file_path = file_path.as_ref();
let (hash_1, hash_2, hash_3, hash_4) = get_hash_by_path(file_path).await?;
let mut options = FindOneAndUpdateOptions::default();
options.return_document = Some(ReturnDocument::After);
options.projection = Some(file_exist_projection());
let result = self
.collections
.files
.find_one_and_update(
doc! {
"hash_1": hash_1,
"hash_2": hash_2,
"hash_3": hash_3,
"hash_4": hash_4,
},
doc! {
"$inc": {
"count": 1
}
},
Some(options),
)
.await?;
match result {
Some(result) => Ok(result.get_object_id("_id")?),
None => {
let file_name = match file_name {
Some(file_name) => file_name.into(),
None => file_path.file_name().unwrap().to_str().unwrap().to_string(),
};
let mut file = File::open(file_path).await?;
let metadata = file.metadata().await?;
let file_size = metadata.len();
let file_id = ObjectId::new();
let mut file_item_raw = doc! {
"_id": file_id,
"hash_1": hash_1,
"hash_2": hash_2,
"hash_3": hash_3,
"hash_4": hash_4,
"file_size": file_size as i64,
"file_name": file_name,
"count": 1i32
};
if file_size > self.file_size_threshold as u64 {
let chunk_id = match self.upload_from_stream(file_id, file).await {
Ok(id) => id,
Err(err) => {
if self.delete_file_chunks(file_id).await.is_err() {
}
return Err(err);
},
};
file_item_raw.insert("chunk_id", chunk_id);
} else {
let mut file_data = Vec::with_capacity(file_size as usize);
file.read_to_end(&mut file_data).await?;
file_item_raw.insert(
"file_data",
Bson::Binary(Binary {
subtype: BinarySubtype::Generic,
bytes: file_data,
}),
);
drop(file);
}
let mime_type = match mime_type {
Some(mime_type) => mime_type,
None => get_mime_by_path(file_path),
};
file_item_raw.insert("mime_type", mime_type.as_ref());
file_item_raw.insert("create_time", DateTime::now());
self.collections.files.insert_one(file_item_raw, None).await?;
Ok(file_id)
},
}
}
pub async fn put_file_by_path_temporarily<P: AsRef<Path>, S: Into<String>>(
&self,
file_path: P,
file_name: Option<S>,
mime_type: Option<Mime>,
) -> Result<ObjectId, FileCenterError> {
let file_path = file_path.as_ref();
let file_name = match file_name {
Some(file_name) => file_name.into(),
None => file_path.file_name().unwrap().to_str().unwrap().to_string(),
};
let mut file = File::open(file_path).await?;
let metadata = file.metadata().await?;
let file_size = metadata.len();
let file_id = ObjectId::new();
let mut file_item_raw = doc! {
"_id": file_id,
"file_size": file_size as i64,
"file_name": file_name,
"count": 1i32
};
let is_stream = file_size > self.file_size_threshold as u64;
if is_stream {
let chunk_id = match self.upload_from_stream(file_id, file).await {
Ok(id) => id,
Err(err) => {
if self.delete_file_chunks(file_id).await.is_err() {
}
return Err(err);
},
};
file_item_raw.insert("chunk_id", chunk_id);
} else {
let mut file_data = Vec::with_capacity(file_size as usize);
file.read_to_end(&mut file_data).await?;
file_item_raw.insert(
"file_data",
Bson::Binary(Binary {
subtype: BinarySubtype::Generic, bytes: file_data
}),
);
drop(file);
}
let mime_type = match mime_type {
Some(mime_type) => mime_type,
None => get_mime_by_path(file_path),
};
file_item_raw.insert("mime_type", mime_type.as_ref());
let now = DateTime::now();
let expire = DateTime::from_millis(now.timestamp_millis() + TEMPORARY_LIFE_TIME);
let expire_chunks =
DateTime::from_millis(now.timestamp_millis() + TEMPORARY_CHUNK_LIFE_TIME);
file_item_raw.insert("create_time", now);
file_item_raw.insert("expire_at", expire);
if is_stream {
self.collections
.files_chunks
.update_many(
doc! {
"file_id": file_id
},
doc! {
"$set": {
"expire_at": expire_chunks
}
},
None,
)
.await?;
}
self.collections.files.insert_one(file_item_raw, None).await?;
Ok(file_id)
}
}
impl FileCenter {
async fn upload_from_buffer(
&self,
file_id: ObjectId,
source: &[u8],
) -> Result<ObjectId, FileCenterError> {
let collection_files_chunks = &self.collections.files_chunks;
let chunk_size = self.file_size_threshold as usize;
let mut inserted_id = None;
for (n, chunk) in source.chunks(chunk_size).enumerate() {
let result = collection_files_chunks
.insert_one(chunk_document(file_id, n as i64, chunk.to_vec()), None)
.await?;
inserted_id = Some(match result.inserted_id.as_object_id() {
Some(id) => id,
None => {
return Err(FileCenterError::DocumentError(ValueAccessError::UnexpectedType));
},
});
}
match inserted_id {
Some(inserted_id) => Ok(inserted_id),
None => {
let result = collection_files_chunks
.insert_one(chunk_document(file_id, 0, Vec::new()), None)
.await?;
match result.inserted_id.as_object_id() {
Some(id) => Ok(id),
None => Err(FileCenterError::DocumentError(ValueAccessError::UnexpectedType)),
}
},
}
}
pub async fn put_file_by_buffer<B: AsRef<[u8]> + Into<Vec<u8>>, S: Into<String>>(
&self,
buffer: B,
file_name: S,
mime_type: Option<Mime>,
) -> Result<ObjectId, FileCenterError> {
let (hash_1, hash_2, hash_3, hash_4) = get_hash_by_buffer(buffer.as_ref());
let mut options = FindOneAndUpdateOptions::default();
options.return_document = Some(ReturnDocument::After);
options.projection = Some(file_exist_projection());
let result = self
.collections
.files
.find_one_and_update(
doc! {
"hash_1": hash_1,
"hash_2": hash_2,
"hash_3": hash_3,
"hash_4": hash_4,
},
doc! {
"$inc": {
"count": 1
}
},
Some(options),
)
.await?;
match result {
Some(result) => Ok(result.get_object_id("_id")?),
None => {
let buffer = buffer.into();
let file_name = file_name.into();
let file_size = buffer.len();
let file_id = ObjectId::new();
let mut file_item_raw = doc! {
"_id": file_id,
"hash_1": hash_1,
"hash_2": hash_2,
"hash_3": hash_3,
"hash_4": hash_4,
"file_size": file_size as i64,
"file_name": file_name,
"count": 1i32
};
if file_size > self.file_size_threshold as usize {
let chunk_id = match self.upload_from_buffer(file_id, &buffer).await {
Ok(id) => id,
Err(err) => {
if self.delete_file_chunks(file_id).await.is_err() {
}
return Err(err);
},
};
file_item_raw.insert("chunk_id", chunk_id);
drop(buffer);
} else {
file_item_raw.insert(
"file_data",
Bson::Binary(Binary {
subtype: BinarySubtype::Generic, bytes: buffer
}),
);
}
let mime_type = mime_type.unwrap_or(DEFAULT_MIME_TYPE);
file_item_raw.insert("mime_type", mime_type.as_ref());
file_item_raw.insert("create_time", DateTime::now());
self.collections.files.insert_one(file_item_raw, None).await?;
Ok(file_id)
},
}
}
pub async fn put_file_by_buffer_temporarily<B: AsRef<[u8]> + Into<Vec<u8>>, S: Into<String>>(
&self,
buffer: B,
file_name: S,
mime_type: Option<Mime>,
) -> Result<ObjectId, FileCenterError> {
let buffer = buffer.into();
let file_name = file_name.into();
let file_size = buffer.len();
let file_id = ObjectId::new();
let mut file_item_raw = doc! {
"_id": file_id,
"file_size": file_size as i64,
"file_name": file_name,
"count": 1i32
};
let is_stream = file_size > self.file_size_threshold as usize;
if is_stream {
let chunk_id = match self.upload_from_buffer(file_id, &buffer).await {
Ok(id) => id,
Err(err) => {
if self.delete_file_chunks(file_id).await.is_err() {
}
return Err(err);
},
};
file_item_raw.insert("chunk_id", chunk_id);
drop(buffer);
} else {
file_item_raw.insert(
"file_data",
Bson::Binary(Binary {
subtype: BinarySubtype::Generic, bytes: buffer
}),
);
}
let mime_type = mime_type.unwrap_or(DEFAULT_MIME_TYPE);
file_item_raw.insert("mime_type", mime_type.as_ref());
let now = DateTime::now();
let expire = DateTime::from_millis(now.timestamp_millis() + TEMPORARY_LIFE_TIME);
let expire_chunks =
DateTime::from_millis(now.timestamp_millis() + TEMPORARY_CHUNK_LIFE_TIME);
file_item_raw.insert("create_time", now);
file_item_raw.insert("expire_at", expire);
if is_stream {
self.collections
.files_chunks
.update_many(
doc! {
"file_id": file_id
},
doc! {
"$set": {
"expire_at": expire_chunks
}
},
None,
)
.await?;
}
self.collections.files.insert_one(file_item_raw, None).await?;
Ok(file_id)
}
}
impl FileCenter {
async fn upload_from_stream_and_hash(
&self,
file_id: ObjectId,
mut first_chunk_plus_one: Vec<u8>,
mut source: impl AsyncRead + Unpin,
) -> Result<(ObjectId, i64, (i64, i64, i64, i64)), FileCenterError> {
let collection_files_chunks = &self.collections.files_chunks;
let buffer_size = self.file_size_threshold as usize;
let mut buffer: Vec<u8> = vec![0u8; buffer_size];
buffer[0] = first_chunk_plus_one[buffer_size];
let mut hasher = Hasher::new();
hasher.update(&first_chunk_plus_one[..buffer_size]);
unsafe {
first_chunk_plus_one.set_len(buffer_size);
}
let result = collection_files_chunks
.insert_one(chunk_document(file_id, 0, first_chunk_plus_one), None)
.await?;
let mut inserted_id = match result.inserted_id.as_object_id() {
Some(id) => id,
None => {
return Err(FileCenterError::DocumentError(ValueAccessError::UnexpectedType));
},
};
let mut n = 1i64;
let mut cc = 1;
let mut file_size = buffer_size as i64;
loop {
loop {
let c = match source.read(&mut buffer[cc..]).await {
Ok(0) => break,
Ok(c) => c,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e.into()),
};
cc += c;
if cc == buffer_size {
break;
}
}
if cc == 0 {
break;
}
let chunk = &buffer[..cc];
hasher.update(chunk);
let result = collection_files_chunks
.insert_one(chunk_document(file_id, n, chunk.to_vec()), None)
.await?;
inserted_id = match result.inserted_id.as_object_id() {
Some(id) => id,
None => {
return Err(FileCenterError::DocumentError(ValueAccessError::UnexpectedType));
},
};
n += 1;
file_size += cc as i64;
cc = 0;
}
let hash = separate_hash(&hasher.finalize());
Ok((inserted_id, file_size, hash))
}
async fn upload_from_stream_and_no_hash(
&self,
file_id: ObjectId,
mut first_chunk_plus_one: Vec<u8>,
mut source: impl AsyncRead + Unpin,
) -> Result<(ObjectId, i64), FileCenterError> {
let collection_files_chunks = &self.collections.files_chunks;
let buffer_size = self.file_size_threshold as usize;
let mut buffer: Vec<u8> = vec![0u8; buffer_size];
buffer[0] = first_chunk_plus_one[buffer_size];
unsafe {
first_chunk_plus_one.set_len(buffer_size);
}
let result = collection_files_chunks
.insert_one(chunk_document(file_id, 0, first_chunk_plus_one), None)
.await?;
let mut inserted_id = match result.inserted_id.as_object_id() {
Some(id) => id,
None => {
return Err(FileCenterError::DocumentError(ValueAccessError::UnexpectedType));
},
};
let mut n = 1i64;
let mut cc = 1;
let mut file_size = buffer_size as i64;
loop {
loop {
let c = match source.read(&mut buffer[cc..]).await {
Ok(0) => break,
Ok(c) => c,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e.into()),
};
cc += c;
if cc == buffer_size {
break;
}
}
if cc == 0 {
break;
}
let chunk = &buffer[..cc];
let result = collection_files_chunks
.insert_one(chunk_document(file_id, n, chunk.to_vec()), None)
.await?;
inserted_id = match result.inserted_id.as_object_id() {
Some(id) => id,
None => {
return Err(FileCenterError::DocumentError(ValueAccessError::UnexpectedType));
},
};
n += 1;
file_size += cc as i64;
cc = 0;
}
Ok((inserted_id, file_size))
}
pub async fn put_file_by_reader<R: AsyncRead + Unpin, S: Into<String>>(
&self,
mut reader: R,
file_name: S,
mime_type: Option<Mime>,
) -> Result<ObjectId, FileCenterError> {
let buffer_size = self.file_size_threshold as usize + 1;
let mut file_data = vec![0u8; buffer_size];
let mut cc = 0;
loop {
let c = match reader.read(&mut file_data[cc..]).await {
Ok(0) => break,
Ok(c) => c,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e.into()),
};
cc += c;
if cc == buffer_size {
break;
}
}
let cc = cc as i64;
let file_name = file_name.into();
let file_id = ObjectId::new();
let mut file_item_raw = doc! {
"_id": file_id,
"file_name": file_name,
"count": 1i32
};
let is_stream = cc == buffer_size as i64;
let (hash_1, hash_2, hash_3, hash_4) = if is_stream {
let (chunk_id, file_size, hash) =
match self.upload_from_stream_and_hash(file_id, file_data, reader).await {
Ok(id) => id,
Err(err) => {
if self.delete_file_chunks(file_id).await.is_err() {
}
return Err(err);
},
};
file_item_raw.insert("file_size", file_size);
file_item_raw.insert("chunk_id", chunk_id);
hash
} else {
unsafe {
file_data.set_len(cc as usize);
}
let hash = get_hash_by_buffer(&file_data);
file_item_raw.insert("file_size", cc);
file_item_raw.insert(
"file_data",
Bson::Binary(Binary {
subtype: BinarySubtype::Generic, bytes: file_data
}),
);
hash
};
let mut options = FindOneAndUpdateOptions::default();
options.return_document = Some(ReturnDocument::After);
options.projection = Some(file_exist_projection());
let result = self
.collections
.files
.find_one_and_update(
doc! {
"hash_1": hash_1,
"hash_2": hash_2,
"hash_3": hash_3,
"hash_4": hash_4,
},
doc! {
"$inc": {
"count": 1
}
},
Some(options),
)
.await?;
match result {
Some(result) => {
if is_stream && self.delete_file_chunks(file_id).await.is_err() {
}
Ok(result.get_object_id("_id")?)
},
None => {
file_item_raw.insert("hash_1", hash_1);
file_item_raw.insert("hash_2", hash_2);
file_item_raw.insert("hash_3", hash_3);
file_item_raw.insert("hash_4", hash_4);
let mime_type = mime_type.unwrap_or(DEFAULT_MIME_TYPE);
file_item_raw.insert("mime_type", mime_type.as_ref());
file_item_raw.insert("create_time", DateTime::now());
self.collections.files.insert_one(file_item_raw, None).await?;
Ok(file_id)
},
}
}
pub async fn put_file_by_reader_temporarily<R: AsyncRead + Unpin, S: Into<String>>(
&self,
mut reader: R,
file_name: S,
mime_type: Option<Mime>,
) -> Result<ObjectId, FileCenterError> {
let buffer_size = self.file_size_threshold as usize + 1;
let mut file_data = vec![0u8; buffer_size];
let mut cc = 0;
loop {
let c = match reader.read(&mut file_data[cc..]).await {
Ok(0) => break,
Ok(c) => c,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e.into()),
};
cc += c;
if cc == buffer_size {
break;
}
}
let cc = cc as i64;
let file_name = file_name.into();
let file_id = ObjectId::new();
let mut file_item_raw = doc! {
"_id": file_id,
"file_name": file_name,
"count": 1i32
};
let is_stream = cc == buffer_size as i64;
if is_stream {
let (chunk_id, file_size) =
match self.upload_from_stream_and_no_hash(file_id, file_data, reader).await {
Ok(id) => id,
Err(err) => {
if self.delete_file_chunks(file_id).await.is_err() {
}
return Err(err);
},
};
file_item_raw.insert("file_size", file_size);
file_item_raw.insert("chunk_id", chunk_id);
} else {
unsafe {
file_data.set_len(cc as usize);
}
file_item_raw.insert("file_size", cc);
file_item_raw.insert(
"file_data",
Bson::Binary(Binary {
subtype: BinarySubtype::Generic, bytes: file_data
}),
);
};
let mime_type = mime_type.unwrap_or(DEFAULT_MIME_TYPE);
file_item_raw.insert("mime_type", mime_type.as_ref());
let now = DateTime::now();
let expire = DateTime::from_millis(now.timestamp_millis() + TEMPORARY_LIFE_TIME);
let expire_chunks =
DateTime::from_millis(now.timestamp_millis() + TEMPORARY_CHUNK_LIFE_TIME);
file_item_raw.insert("create_time", now);
file_item_raw.insert("expire_at", expire);
if is_stream {
self.collections
.files_chunks
.update_many(
doc! {
"file_id": file_id
},
doc! {
"$set": {
"expire_at": expire_chunks
}
},
None,
)
.await?;
}
self.collections.files.insert_one(file_item_raw, None).await?;
Ok(file_id)
}
}
impl FileCenter {
pub async fn clear_garbage(&self) -> Result<(), FileCenterError> {
{
let mut result = self
.collections
.files
.aggregate(
[
doc! {
"$match": {
"chunk_id": {
"$exists": true
}
}
},
doc! {
"$lookup": {
"from": COLLECTION_FILES_CHUNKS_NAME,
"localField": "chunk_id",
"foreignField": "_id",
"as": "chunk"
}
},
doc! {
"$match": {
"chunk": []
}
},
doc! {
"$project": {
"_id": 1
}
},
],
None,
)
.await?;
let mut ids = Vec::new();
while let Some(d) = result.try_next().await? {
ids.push(d.get_object_id("_id")?);
}
if !ids.is_empty() {
self.collections
.files
.delete_many(
doc! {
"_id": {
"$in": ids
}
},
None,
)
.await?;
}
}
{
let mut result = self
.collections
.files
.find(
doc! {
"count": {
"$lte": 0
}
},
None,
)
.await?;
let mut ids = Vec::new();
while let Some(d) = result.try_next().await? {
ids.push(d.get_object_id("_id")?);
}
if !ids.is_empty() {
self.collections
.files
.delete_many(
doc! {
"_id": {
"$in": ids.clone()
}
},
None,
)
.await?;
self.collections
.files_chunks
.delete_many(
doc! {
"file_id": {
"$in": ids
}
},
None,
)
.await?;
}
}
{
let mut result = self
.collections
.files_chunks
.aggregate(
[
doc! {
"$lookup": {
"from": COLLECTION_FILES_NAME,
"localField": "file_id",
"foreignField": "_id",
"as": "item"
}
},
doc! {
"$match": {
"item": []
}
},
doc! {
"$group": {
"_id": null,
"file_ids": {
"$addToSet": "$file_id"
}
}
},
doc! {
"$unwind": "$file_ids"
},
doc! {
"$project": {
"file_id": "$file_ids"
}
},
],
None,
)
.await?;
let mut ids = Vec::new();
while let Some(d) = result.try_next().await? {
ids.push(d.get_object_id("file_id")?);
}
if !ids.is_empty() {
self.collections
.files_chunks
.delete_many(
doc! {
"file_id": {
"$in": ids
}
},
None,
)
.await?;
}
}
Ok(())
}
}
impl FileCenter {
#[allow(clippy::missing_safety_doc)]
#[inline]
pub unsafe fn database(&self) -> &Database {
&self.db
}
}
impl FileCenter {
pub fn decrypt_id_token<S: AsRef<str>>(
&self,
id_token: S,
) -> Result<ObjectId, FileCenterError> {
let id_raw = self
.short_crypt
.decrypt_url_component(id_token)
.map_err(FileCenterError::IDTokenError)?;
let id_raw: [u8; 12] = {
if id_raw.len() != 12 {
return Err(FileCenterError::IDTokenError("ID needs to be 12 bytes"));
}
let mut fixed_raw = [0u8; 12];
fixed_raw.copy_from_slice(&id_raw);
fixed_raw
};
Ok(ObjectId::from_bytes(id_raw))
}
#[inline]
pub fn encrypt_id(&self, id: ObjectId) -> IDToken {
let id_raw = id.bytes();
self.short_crypt.encrypt_to_url_component(&id_raw)
}
#[inline]
pub fn encrypt_id_to_buffer(&self, id: ObjectId, buffer: String) -> String {
let id_raw = id.bytes();
self.short_crypt.encrypt_to_url_component_and_push_to_string(&id_raw, buffer)
}
}