use std::ffi::OsString;
use std::fs::{File, OpenOptions};
use std::io::BufWriter;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::{Mutex, MutexGuard};
use std::{fmt, fs, mem};
use super::*;
#[derive(Debug)]
enum OutputState {
Empty,
MemBacked(Vec<u8>),
FileBacked(BufWriter<File>),
StagedPath,
OtherObject(DataObjectId),
}
#[derive(Debug)]
pub struct Output {
pub spec: ObjectSpec,
data: OutputState,
info: ObjectInfo,
path: PathBuf,
order: usize,
}
impl fmt::Display for Output {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Output #{} ({:?} ID {}, label {:?})",
self.order, self.spec.data_type, self.spec.id, self.spec.label
)
}
}
impl Output {
pub(crate) fn new(obj: LocalObjectIn, stage_path: &Path, order: usize) -> Self {
assert!(obj.info.is_none());
Output {
path: stage_path.join(format!(
"output-{}_{}",
obj.spec.id.get_session_id(),
obj.spec.id.get_id()
)),
spec: obj.spec,
data: OutputState::Empty,
info: ObjectInfo::default(),
order: order,
}
}
pub(crate) fn into_output_spec(self) -> (LocalObjectOut, bool) {
(
LocalObjectOut {
info: self.info,
location: Some(match self.data {
OutputState::Empty => DataLocation::Memory(Vec::new()),
OutputState::MemBacked(data) => DataLocation::Memory(data),
OutputState::FileBacked(f) => {
drop(f);
DataLocation::Path(self.path)
}
OutputState::StagedPath => DataLocation::Path(self.path),
OutputState::OtherObject(id) => DataLocation::OtherObject(id),
}),
cache_hint: false,
},
false,
)
}
pub fn stage_directory<P: AsRef<Path>>(&mut self, path: P) -> TaskResult<()> {
self.check_directory()?;
let path: &Path = path.as_ref();
if !path.is_dir() {
panic!(
"Path {:?} given to `stage_directory` on {} is not a readable directory.",
path, self
);
}
if !matchvar!(self.data, OutputState::Empty) {
panic!(
"Called `stage_directory` on {} after being previously staged.",
self
)
}
fs::rename(path, &self.path).unwrap_or_else(|_| {
panic!(
"error moving directory {:?} to staging ({:?}) on {}",
path, self.path, self
)
});
self.data = OutputState::StagedPath;
Ok(())
}
pub fn stage_file<P: AsRef<Path>>(&mut self, path: P) -> TaskResult<()> {
self.check_blob()?;
let path: &Path = path.as_ref();
if !path.is_file() {
panic!(
"Path {:?} given to `stage_file` on {} is not a readable regular file.",
path, self
);
}
if !matchvar!(self.data, OutputState::Empty) {
panic!(
"Called `stage_file` on {} after being previously staged or written to.",
self
)
}
fs::rename(path, &self.path).unwrap_or_else(|_| {
panic!(
"error moving directory {:?} to staging ({:?}) on {}",
path, self.path, self
)
});
self.data = OutputState::StagedPath;
Ok(())
}
pub fn stage_input(&mut self, object: &DataInstance) -> TaskResult<()> {
if self.spec.data_type != object.spec.data_type {
bail!("Can't stage input {} as output {}: data type mismatch.")
}
if !matchvar!(self.data, OutputState::Empty) {
panic!(
"Called `stage_input` on {} after being previously staged or written to.",
self
)
}
self.data = OutputState::OtherObject(object.spec.id);
Ok(())
}
pub(crate) fn cleanup_failed_task(&mut self) {
let remove_path = match self.data {
OutputState::FileBacked(_) | OutputState::StagedPath => true,
_ => false,
};
self.data = OutputState::Empty; if remove_path {
fs::remove_dir_all(&self.path).expect("error removing staged path on task failure");
}
let debug = self.info.debug.clone();
self.info = ObjectInfo::default();
self.info.debug = debug;
}
pub fn check_directory(&self) -> TaskResult<()> {
if self.spec.data_type == DataType::Directory {
Ok(())
} else {
bail!("The output {} expects a directory.", self)
}
}
pub fn check_blob(&self) -> TaskResult<()> {
if self.spec.data_type == DataType::Blob {
Ok(())
} else {
bail!("The output {} expects a file or a data blob.", self)
}
}
pub fn get_content_type(&self) -> String {
if self.spec.data_type != DataType::Blob {
return "".into();
}
if self.info.content_type.len() > 0 {
self.info.content_type.clone()
} else {
self.spec.content_type.clone()
}
}
pub fn set_content_type(&mut self, ctype: impl Into<String>) -> TaskResult<()> {
self.check_blob()?;
if self.info.content_type.len() > 0 {
bail!("The content type of {} has been already set.", self);
}
self.info.content_type = ctype.into();
Ok(())
}
pub fn set_user_info(&mut self, key: impl Into<String>, val: UserValue) {
self.info.user.insert(key.into(), val);
}
fn convert_to_file(&mut self) -> ::std::io::Result<()> {
assert!(matchvar!(self.data, OutputState::MemBacked(_)));
let mut f = BufWriter::new(OpenOptions::new()
.write(true)
.create_new(true)
.open(&self.path)?);
if let OutputState::MemBacked(ref data) = self.data {
f.write_all(data)?;
} else {
unreachable!();
}
self.data = OutputState::FileBacked(f);
Ok(())
}
pub fn make_file_backed(&mut self) -> TaskResult<()> {
self.check_blob()?;
if matchvar!(self.data, OutputState::Empty) {
self.data = OutputState::MemBacked(Vec::new());
}
Ok(match self.data {
OutputState::MemBacked(_) => self.convert_to_file()
.expect("error writing output to file"),
OutputState::FileBacked(_) => (),
_ => {
panic!(
"can't make output {} file backed: it has been staged with input, file or dir",
self
);
}
})
}
}
impl Write for Output {
fn write(&mut self, buf: &[u8]) -> ::std::io::Result<usize> {
if matchvar!(self.data, OutputState::Empty) {
self.data = OutputState::MemBacked(Vec::new());
}
if matchvar!(self.data, OutputState::MemBacked(_)) {
let overflow = if let OutputState::MemBacked(ref data) = self.data {
data.len() + buf.len() > MEM_BACKED_LIMIT
} else {
false
};
if overflow {
self.convert_to_file()?;
}
}
match self.data {
OutputState::MemBacked(ref mut data) => data.write(buf).into(),
OutputState::FileBacked(ref mut f) => f.write(buf).into(),
_ => panic!("can't write to output {} that has been staged with input, file or dir"),
}
}
fn flush(&mut self) -> ::std::io::Result<()> {
if let OutputState::FileBacked(ref mut f) = self.data {
f.flush().into()
} else {
Ok(())
}
}
}