use futures::unsync::oneshot;
pub use rain_core::common_capnp::TaskState;
use rain_core::{errors::*, types::*, utils::*};
use std::fmt;
use super::{DataObjectRef, DataObjectState, GovernorRef, SessionRef};
use wrapped::WrappedRcRefCell;
#[derive(Debug)]
pub struct Task {
pub(in super::super) state: TaskState,
pub(in super::super) inputs: Vec<DataObjectRef>,
pub(in super::super) outputs: Vec<DataObjectRef>,
pub(in super::super) waiting_for: RcSet<DataObjectRef>,
pub(in super::super) assigned: Option<GovernorRef>,
pub(in super::super) scheduled: Option<GovernorRef>,
pub(in super::super) session: SessionRef,
pub(in super::super) finish_hooks: Vec<FinishHook>,
pub(in super::super) spec: TaskSpec,
pub(in super::super) info: TaskInfo,
}
pub type TaskRef = WrappedRcRefCell<Task>;
impl Task {
pub fn to_governor_capnp(&self, builder: &mut ::rain_core::governor_capnp::task::Builder) {
builder.set_spec(&::serde_json::to_string(&self.spec).unwrap());
}
#[inline]
pub fn id(&self) -> TaskId {
self.spec.id
}
#[inline]
pub fn spec(&self) -> &TaskSpec {
&self.spec
}
#[inline]
pub fn is_finished(&self) -> bool {
match self.state {
TaskState::Finished => true,
_ => false,
}
}
pub fn trigger_finish_hooks(&mut self) {
assert!(self.is_finished());
for sender in ::std::mem::replace(&mut self.finish_hooks, Vec::new()) {
match sender.send(()) {
Ok(()) => { }
Err(_) => {
debug!("Failed to inform about finishing task");
}
}
}
}
pub fn wait(&mut self) -> oneshot::Receiver<()> {
let (sender, receiver) = oneshot::channel();
match self.state {
TaskState::Finished => sender.send(()).unwrap(),
_ => self.finish_hooks.push(sender),
};
receiver
}
}
impl TaskRef {
pub fn new(
session: &SessionRef,
spec: TaskSpec,
inputs: Vec<DataObjectRef>,
outputs: Vec<DataObjectRef>,
) -> Result<Self> {
assert_eq!(spec.id.get_session_id(), session.get_id());
let mut waiting = RcSet::new();
for i in inputs.iter() {
let inobj = i.get();
match inobj.state {
DataObjectState::Removed => {
bail!(
"Can't create Task {} with Finished input object {}",
spec.id,
inobj.spec.id
);
}
DataObjectState::Finished => {}
DataObjectState::Unfinished => {
waiting.insert(i.clone());
}
}
if inobj.spec.id.get_session_id() != spec.id.get_session_id() {
bail!(
"Input object {} for task {} is from a different session",
inobj.spec.id,
spec.id
);
}
}
for out in outputs.iter() {
let o = out.get();
if let Some(ref prod) = o.producer {
bail!(
"Object {} already has producer (task {}) when creating task {}",
o.spec.id,
prod.get().spec.id,
spec.id
);
}
if o.spec.id.get_session_id() != spec.id.get_session_id() {
bail!(
"Output object {} for task {} is from a different session",
o.spec.id,
spec.id
);
}
}
let sref = TaskRef::wrap(Task {
spec: spec,
info: Default::default(),
state: if waiting.is_empty() {
TaskState::Ready
} else {
TaskState::NotAssigned
},
inputs: inputs,
outputs: outputs.into_iter().collect(),
waiting_for: waiting,
assigned: None,
scheduled: None,
session: session.clone(),
finish_hooks: Default::default(),
});
{
let mut s = session.get_mut();
s.tasks.insert(sref.clone());
s.unfinished_tasks += 1;
}
{
let s = sref.get_mut();
for i in s.inputs.iter() {
let mut o = i.get_mut();
o.consumers.insert(sref.clone());
o.need_by.insert(sref.clone());
}
for out in s.outputs.iter() {
let mut o = out.get_mut();
o.producer = Some(sref.clone());
}
}
Ok(sref)
}
pub fn unschedule(&self) {
let mut inner = self.get_mut();
if let Some(ref w) = inner.scheduled {
assert!(w.get_mut().scheduled_tasks.remove(&self));
if inner.state == TaskState::Ready {
assert!(w.get_mut().scheduled_ready_tasks.remove(&self));
}
if inner.state != TaskState::NotAssigned {
w.get_mut().active_resources -= inner.spec().resources.cpus();
}
}
inner.scheduled = None;
}
pub fn unlink(&self) {
self.unschedule();
let mut inner = self.get_mut();
assert!(
inner.assigned.is_none(),
"Can only unlink non-assigned tasks."
);
for o in inner.outputs.iter() {
debug_assert!(o.get_mut().producer == Some(self.clone()));
o.get_mut().producer = None;
}
for i in inner.inputs.iter() {
i.get_mut().consumers.remove(&self);
}
assert!(inner.session.get_mut().tasks.remove(&self));
inner.finish_hooks.clear();
}
}
impl ConsistencyCheck for TaskRef {
fn check_consistency(&self) -> Result<()> {
debug!("Checking Task {:?} consistency", self);
let s = self.get();
if s.spec.id.get_session_id() != s.session.get_id() {
bail!("ID and Session ID mismatch in {:?}", s);
}
if let Some(ref wr) = s.assigned {
if !wr.get().assigned_tasks.contains(self) {
bail!("assigned asymmetry in {:?}", s);
}
}
if let Some(ref wr) = s.scheduled {
let w = wr.get();
if !w.scheduled_tasks.contains(self) {
bail!("scheduled asymmetry with {:?} in {:?}", wr, s);
}
if w.scheduled_ready_tasks.contains(self) != (s.state == TaskState::Ready) {
bail!("scheduled_ready_task inconsistency in {:?} at {:?}", s, w);
}
}
if !s.session.get().tasks.contains(self) {
bail!("session assymetry in {:?}", s);
}
for i in s.inputs.iter() {
let o = i.get();
if o.state == DataObjectState::Removed && s.state != TaskState::Finished {
bail!("waiting for removed object {:?} in {:?}", o, s);
}
if (o.state == DataObjectState::Finished || o.state == DataObjectState::Removed)
== (s.waiting_for.contains(&i))
{
bail!(
"waiting_for all unfinished inputs invalid woth {:?} in {:?}",
o,
s
);
}
}
for or in s.outputs.iter() {
let o = or.get();
if o.producer != Some(self.clone()) {
bail!("output/producer incosistency of {:?} in {:?}", o, s);
}
if (o.state == DataObjectState::Finished || o.state == DataObjectState::Removed)
&& s.state != TaskState::Finished
{
bail!(
"data object {:?} done/removed before the task has finished in {:?}",
or,
s
);
}
}
if !(match s.state {
TaskState::NotAssigned =>
s.assigned.is_none() && (!s.waiting_for.is_empty() || s.inputs.is_empty()),
TaskState::Ready =>
s.assigned.is_none() && s.waiting_for.is_empty(),
TaskState::Assigned =>
s.assigned.is_some() && s.waiting_for.is_empty(),
TaskState::Running =>
s.assigned.is_some() && s.waiting_for.is_empty(),
TaskState::Finished =>
s.assigned.is_none() && s.waiting_for.is_empty(),
TaskState::Failed =>
s.waiting_for.is_empty(),
}) {
bail!("state/assigned/waiting_for inconsistency in {:?}", s);
}
if s.state == TaskState::Finished && !s.finish_hooks.is_empty() {
bail!("nonempty finish_hooks in Finished {:?}", s);
}
if s.assigned.is_some() && s.scheduled.is_none() && s.state != TaskState::Failed {
bail!("assigned/scheduled inconsistency in {:?}", s);
}
Ok(())
}
}
impl fmt::Debug for TaskRef {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "TaskRef {}", self.get().id())
}
}