use rain_core::{comm::*, errors::*, types::*, utils::*};
use std::fmt;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use super::{Graph, TaskRef};
use governor::data::Data;
use governor::graph::ExecutorRef;
use governor::WorkDir;
use wrapped::WrappedRcRefCell;
#[derive(Debug)]
pub enum DataObjectState {
Assigned,
Remote(SocketAddr),
Pulling((SocketAddr, ::futures::unsync::oneshot::Sender<()>)),
Finished(Arc<Data>),
Removed,
}
#[derive(Debug)]
pub struct DataObject {
pub(in super::super) spec: ObjectSpec,
pub(in super::super) state: DataObjectState,
pub(in super::super) info: ObjectInfo,
pub(in super::super) consumers: RcSet<TaskRef>,
pub(in super::super) assigned: bool,
pub(in super::super) executor_cache: RcSet<ExecutorRef>,
}
pub type DataObjectRef = WrappedRcRefCell<DataObject>;
impl DataObject {
pub fn set_as_removed(&mut self) {
self.state = DataObjectState::Removed;
}
pub fn set_data(&mut self, data: Arc<Data>) -> Result<()> {
if self.is_removed() {
return Ok(());
}
assert!(!self.is_finished());
if self.spec.data_type != data.data_type() {
bail!(
"Output '{}' (content_type={}) expects type {}, but {} is provided",
self.spec.label,
self.content_type(),
self.spec.data_type,
data.data_type(),
)
}
self.info.size = Some(data.size());
self.state = DataObjectState::Finished(data);
Ok(())
}
pub fn content_type(&self) -> &String {
&self.spec.content_type
}
#[inline]
pub fn is_finished(&self) -> bool {
match self.state {
DataObjectState::Finished(_) => true,
_ => false,
}
}
#[inline]
pub fn is_removed(&self) -> bool {
match self.state {
DataObjectState::Removed => true,
_ => false,
}
}
pub fn data(&self) -> &Arc<Data> {
match self.state {
DataObjectState::Finished(ref data) => data,
_ => panic!("DataObject is not finished"),
}
}
pub fn remote(&self) -> Option<GovernorId> {
match self.state {
DataObjectState::Remote(ref addr) | DataObjectState::Pulling((ref addr, _)) => {
Some(*addr)
}
_ => None,
}
}
pub fn set_data_by_fs_move(
&mut self,
source_path: &Path,
info_path: Option<&str>,
work_dir: &WorkDir,
) -> Result<()> {
let metadata = ::std::fs::metadata(source_path).map_err(|_| {
ErrorKind::Msg(format!(
"Path '{}' now found.",
info_path.unwrap_or_else(|| source_path.to_str().unwrap())
))
})?;
let target_path = work_dir.new_path_for_dataobject();
let data = Data::new_by_fs_move(source_path, &metadata, target_path, work_dir.data_path())?;
self.set_data(Arc::new(data))
}
pub fn create_input_spec(&self, executor_ref: &ExecutorRef) -> LocalObjectIn {
LocalObjectIn {
spec: self.spec.clone(),
info: Some(self.info.clone()),
location: if self.executor_cache.contains(executor_ref) {
Some(DataLocation::Cached)
} else {
Some(self.data().create_location())
},
cache_hint: true,
}
}
pub fn create_output_spec(&self) -> LocalObjectIn {
LocalObjectIn {
spec: self.spec.clone(),
info: None,
location: None,
cache_hint: true,
}
}
}
impl DataObjectRef {
pub fn new(
graph: &mut Graph,
spec: ObjectSpec,
state: DataObjectState,
assigned: bool,
) -> Self {
debug!("New object id={}", spec.id);
match graph.objects.entry(spec.id) {
::std::collections::hash_map::Entry::Vacant(e) => {
let dataobj = Self::wrap(DataObject {
spec,
info: Default::default(),
state,
assigned,
consumers: Default::default(),
executor_cache: Default::default(),
});
e.insert(dataobj.clone());
dataobj
}
::std::collections::hash_map::Entry::Occupied(e) => {
let dataobj = e.get().clone();
{
let obj = dataobj.get();
assert!(obj.spec.id == spec.id);
}
dataobj
}
}
}
}
impl fmt::Debug for DataObjectRef {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "DataObjectRef {}", self.get().spec.id)
}
}