use bson::{self, Bson, bson, doc, oid};
use bson::spec::BinarySubtype;
use chrono::{DateTime, Utc};
use md5::{Md5, Digest};
use hex;
use Error::{self, ArgumentError, OperationError, PoisonLockError};
use Result;
use super::Store;
use coll::options::IndexOptions;
use std::{cmp, io, thread};
use std::error::Error as ErrorTrait;
use std::io::Write;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::sync::atomic::{AtomicIsize, ATOMIC_ISIZE_INIT, Ordering};
pub const DEFAULT_CHUNK_SIZE: i32 = 255 * 1024;
pub const MEGABYTE: usize = 1024 * 1024;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Mode {
Closed,
Read,
Write,
}
#[derive(Debug)]
struct InnerError {
inner: Option<Error>,
}
#[derive(Debug)]
pub struct File {
mutex: Arc<Mutex<()>>,
condvar: Arc<Condvar>,
gfs: Store,
chunk_num: i32,
offset: i64,
wpending: Arc<AtomicIsize>,
wbuf: Vec<u8>,
wsum: Md5,
rbuf: Vec<u8>,
rcache: Option<Arc<Mutex<CachedChunk>>>,
mode: Mode,
err: Arc<RwLock<InnerError>>,
pub doc: GfsFile,
}
#[derive(Debug)]
pub struct GfsFile {
len: i64,
md5: String,
pub id: oid::ObjectId,
pub chunk_size: i32,
pub aliases: Vec<String>,
pub name: Option<String>,
pub upload_date: Option<DateTime<Utc>>,
pub content_type: Option<String>,
pub metadata: Option<Vec<u8>>,
}
#[derive(Debug)]
struct CachedChunk {
n: i32,
data: Vec<u8>,
err: Option<Error>,
}
impl Deref for File {
type Target = GfsFile;
fn deref(&self) -> &Self::Target {
&self.doc
}
}
impl DerefMut for File {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.doc
}
}
impl File {
pub fn new(gfs: Store, id: oid::ObjectId, mode: Mode) -> File {
File::with_gfs_file(gfs, GfsFile::new(id), mode)
}
pub fn with_name(gfs: Store, name: String, id: oid::ObjectId, mode: Mode) -> File {
File::with_gfs_file(gfs, GfsFile::with_name(name, id), mode)
}
pub fn with_doc(gfs: Store, doc: bson::Document) -> File {
File::with_gfs_file(gfs, GfsFile::with_doc(doc), Mode::Read)
}
fn with_gfs_file(gfs: Store, file: GfsFile, mode: Mode) -> File {
File {
mutex: Arc::new(Mutex::new(())),
condvar: Arc::new(Condvar::new()),
mode: mode,
gfs: gfs,
chunk_num: 0,
offset: 0,
wpending: Arc::new(ATOMIC_ISIZE_INIT),
wbuf: Vec::new(),
wsum: Md5::new(),
rbuf: Vec::new(),
rcache: None,
doc: file,
err: Arc::new(RwLock::new(InnerError { inner: None })),
}
}
pub fn len(&self) -> i64 {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn err_description(&self) -> Result<Option<String>> {
let err = self.err.read()?;
let inner = &err.deref().inner;
let description = match *inner {
Some(ref err) => Some(String::from(err.description())),
None => None,
};
Ok(description)
}
pub fn assert_mode(&self, mode: Mode) -> Result<()> {
if self.mode == mode {
Ok(())
} else {
let message = match self.mode {
Mode::Read => "File is open for reading.",
Mode::Write => "File is open for writing.",
Mode::Closed => "File is closed.",
};
Err(ArgumentError(String::from(message)))
}
}
pub fn close(&mut self) -> Result<()> {
if self.mode == Mode::Write {
self.flush()?;
}
let _guard = self.mutex.lock()?;
if self.mode == Mode::Write {
if self.err_description()?.is_none() {
if self.doc.upload_date.is_none() {
self.doc.upload_date = Some(Utc::now());
}
self.doc.md5 = hex::encode(self.wsum.result());
self.gfs.files.insert_one(self.doc.to_bson(), None)?;
self.gfs.files.create_index(doc!{ "filename": 1 }, None)?;
let mut opts = IndexOptions::new();
opts.unique = Some(true);
self.gfs.chunks.create_index(
doc! {
"files_id": 1,
"n": 1,
},
Some(opts),
)?;
} else {
self.gfs.chunks.delete_many(
doc! { "files_id": self.doc.id.clone() },
None,
)?;
}
}
if self.mode == Mode::Read && self.rcache.is_some() {
{
let cache = self.rcache.as_ref().unwrap();
let _ = cache.lock()?;
}
self.rcache = None;
}
self.mode = Mode::Closed;
let description = self.err_description()?;
if description.is_some() {
Err(OperationError(description.unwrap()))
} else {
Ok(())
}
}
fn insert_chunk(&self, n: i32, buf: &[u8]) -> Result<()> {
self.wpending.fetch_add(1, Ordering::SeqCst);
let mut vec_buf = Vec::with_capacity(buf.len());
vec_buf.extend(buf.iter().cloned());
let document = doc! {
"_id": oid::ObjectId::new()?,
"files_id": self.doc.id.clone(),
"n": n,
"data": (BinarySubtype::Generic, vec_buf)
};
let arc_gfs = self.gfs.clone();
let arc_mutex = self.mutex.clone();
let arc_wpending = self.wpending.clone();
let cvar = self.condvar.clone();
let err = self.err.clone();
thread::spawn(move || {
let result = arc_gfs.chunks.insert_one(document, None);
let _guard = arc_mutex.lock();
arc_wpending.fetch_sub(1, Ordering::SeqCst);
if result.is_err() {
if let Ok(mut err_mut) = err.write() {
err_mut.inner = Some(result.err().unwrap());
}
}
cvar.notify_all();
});
Ok(())
}
pub fn find_chunk(&mut self, id: oid::ObjectId, chunk_num: i32) -> Result<Vec<u8>> {
let filter = doc! {
"files_id": id,
"n": chunk_num,
};
match self.gfs.chunks.find_one(Some(filter), None)? {
Some(doc) => {
match doc.get("data") {
Some(&Bson::Binary(_, ref buf)) => Ok(buf.clone()),
_ => Err(OperationError(String::from("Chunk contained no data"))),
}
}
None => Err(OperationError(String::from("Chunk not found"))),
}
}
fn get_chunk(&mut self) -> Result<Vec<u8>> {
let id = self.doc.id.clone();
let curr_chunk_num = self.chunk_num;
let data = if let Some(lock) = self.rcache.take() {
let cache = lock.lock()?;
if cache.n == curr_chunk_num && cache.err.is_none() {
cache.data.clone()
} else {
self.find_chunk(id, curr_chunk_num)?
}
} else {
self.find_chunk(id, curr_chunk_num)?
};
self.chunk_num += 1;
if (self.chunk_num as i64) * (self.doc.chunk_size as i64) < self.doc.len {
let cache = Arc::new(Mutex::new(CachedChunk::new(self.chunk_num)));
let arc_cache = cache.clone();
let arc_gfs = self.gfs.clone();
let id = self.doc.id.clone();
let next_chunk_num = self.chunk_num;
thread::spawn(move || {
let mut cache = match arc_cache.lock() {
Ok(cache) => cache,
Err(_) => {
return;
}
};
let result = arc_gfs.chunks.find_one(
Some(doc! {
"files_id": id,
"n": next_chunk_num
}),
None,
);
match result {
Ok(Some(doc)) => {
match doc.get("data") {
Some(&Bson::Binary(_, ref buf)) => {
cache.data = buf.clone();
cache.err = None;
}
_ => {
cache.err = Some(OperationError(String::from(
"Chunk contained \
no data.",
)))
}
}
}
Ok(None) => cache.err = Some(OperationError(String::from("Chunk not found."))),
Err(err) => cache.err = Some(err),
}
});
self.rcache = Some(cache);
}
Ok(data)
}
}
impl io::Write for File {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.assert_mode(Mode::Write)?;
let mut guard = match self.mutex.lock() {
Ok(guard) => guard,
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, PoisonLockError)),
};
let description = self.err_description()?;
if description.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
OperationError(description.unwrap()),
));
}
let mut data = buf;
let n = data.len();
let chunk_size = self.doc.chunk_size as usize;
self.doc.len += data.len() as i64;
if self.wbuf.len() + data.len() < chunk_size {
self.wbuf.extend(data.iter().cloned());
return Ok(n);
}
if !self.wbuf.is_empty() {
let missing = cmp::min(chunk_size - self.wbuf.len(), data.len());
let (part1, part2) = data.split_at(missing);
self.wbuf.extend(part1.iter().cloned());
data = part2;
let curr_chunk_num = self.chunk_num;
self.chunk_num += 1;
self.wsum.input(buf);
while self.doc.chunk_size * self.wpending.load(Ordering::SeqCst) as i32 >=
MEGABYTE as i32
{
guard = match self.condvar.wait(guard) {
Ok(guard) => guard,
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, PoisonLockError)),
};
let description = self.err_description()?;
if description.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
OperationError(description.unwrap()),
));
}
}
let chunk = self.wbuf.clone();
self.insert_chunk(curr_chunk_num, &chunk)?;
self.wbuf.clear();
}
while data.len() > chunk_size as usize {
let size = cmp::min(chunk_size, data.len());
let (part1, part2) = data.split_at(size);
let curr_chunk_num = self.chunk_num;
self.chunk_num += 1;
self.wsum.input(buf);
while self.doc.chunk_size * self.wpending.load(Ordering::SeqCst) as i32 >=
MEGABYTE as i32
{
guard = match self.condvar.wait(guard) {
Ok(guard) => guard,
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, PoisonLockError)),
};
let description = self.err_description()?;
if description.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
OperationError(description.unwrap()),
));
}
}
self.insert_chunk(curr_chunk_num, part1)?;
data = part2;
}
self.wbuf.extend(data.iter().cloned());
Ok(n)
}
fn flush(&mut self) -> io::Result<()> {
self.assert_mode(Mode::Write)?;
let mut guard = match self.mutex.lock() {
Ok(guard) => guard,
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, PoisonLockError)),
};
if !self.wbuf.is_empty() && self.err_description()?.is_none() {
let chunk_num = self.chunk_num;
self.chunk_num += 1;
self.wsum.input(&self.wbuf);
while self.doc.chunk_size * self.wpending.load(Ordering::SeqCst) as i32 >=
MEGABYTE as i32
{
guard = match self.condvar.wait(guard) {
Ok(guard) => guard,
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, PoisonLockError)),
}
}
if self.err_description()?.is_none() {
let chunk = self.wbuf.clone();
self.insert_chunk(chunk_num, &chunk)?;
self.wbuf.clear();
}
}
while self.wpending.load(Ordering::SeqCst) > 0 {
guard = match self.condvar.wait(guard) {
Ok(guard) => guard,
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, PoisonLockError)),
}
}
let description = self.err_description()?;
if description.is_some() {
return Err(io::Error::new(
io::ErrorKind::Other,
OperationError(description.unwrap()),
));
}
Ok(())
}
}
impl io::Read for File {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.assert_mode(Mode::Read)?;
let _ = match self.mutex.lock() {
Ok(guard) => guard,
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, PoisonLockError)),
};
if self.offset == self.doc.len {
return Ok(0);
}
while self.rbuf.len() < buf.len() && (self.chunk_num as i64) * (self.doc.chunk_size as i64) < self.doc.len {
let chunk = self.get_chunk()?;
self.rbuf.extend(chunk);
}
let i = (&mut *buf).write(&self.rbuf)?;
self.offset += i as i64;
let mut new_rbuf = Vec::with_capacity(self.rbuf.len() - i);
{
let (_, p2) = self.rbuf.split_at(i);
let b: Vec<u8> = p2.to_vec();
new_rbuf.extend(b);
}
self.rbuf = new_rbuf;
Ok(i)
}
}
impl Drop for File {
fn drop(&mut self) {
let _ = self.close();
}
}
impl GfsFile {
pub fn new(id: oid::ObjectId) -> GfsFile {
GfsFile {
id: id,
chunk_size: DEFAULT_CHUNK_SIZE,
name: None,
len: 0,
md5: String::new(),
aliases: Vec::new(),
upload_date: None,
content_type: None,
metadata: None,
}
}
pub fn with_name(name: String, id: oid::ObjectId) -> GfsFile {
GfsFile {
id: id,
chunk_size: DEFAULT_CHUNK_SIZE,
name: Some(name),
len: 0,
md5: String::new(),
aliases: Vec::new(),
upload_date: None,
content_type: None,
metadata: None,
}
}
pub fn with_doc(doc: bson::Document) -> GfsFile {
let mut file: GfsFile;
if let Some(&Bson::ObjectId(ref id)) = doc.get("_id") {
file = GfsFile::new(id.clone())
} else {
panic!("Document has no _id!");
}
if let Some(&Bson::String(ref name)) = doc.get("filename") {
file.name = Some(name.to_owned());
}
if let Some(&Bson::I32(chunk_size)) = doc.get("chunkSize") {
file.chunk_size = chunk_size;
}
if let Some(&Bson::UtcDatetime(datetime)) = doc.get("uploadDate") {
file.upload_date = Some(datetime);
}
if let Some(&Bson::I64(length)) = doc.get("length") {
file.len = length;
}
if let Some(&Bson::String(ref hash)) = doc.get("md5") {
file.md5 = hash.to_owned();
}
if let Some(&Bson::String(ref content_type)) = doc.get("contentType") {
file.content_type = Some(content_type.to_owned());
}
if let Some(&Bson::Binary(_, ref metadata)) = doc.get("metadata") {
file.metadata = Some(metadata.clone());
}
file
}
pub fn to_bson(&self) -> bson::Document {
let mut doc = doc! {
"_id": self.id.clone(),
"chunkSize": self.chunk_size,
"length": self.len,
"md5": self.md5.to_owned(),
"uploadDate": self.upload_date.as_ref().unwrap().clone()
};
if let Some(name) = self.name.as_ref() {
doc.insert("filename", name);
}
if let Some(content_type) = self.content_type.as_ref() {
doc.insert("contentType", content_type);
}
if let Some(metadata) = self.metadata.as_ref() {
doc.insert("metadata", (BinarySubtype::Generic, metadata.clone()));
}
doc
}
}
impl CachedChunk {
pub fn new(n: i32) -> CachedChunk {
CachedChunk {
n: n,
data: Vec::new(),
err: Some(Error::DefaultError(
String::from("Chunk has not yet been initialized"),
)),
}
}
}