use futures::unsync::oneshot::Receiver;
use rain_core::{errors::*, types::*, utils::*};
use std::fmt;
use super::{ClientRef, DataObjectRef, DataObjectState, TaskRef, TaskState};
use wrapped::WrappedRcRefCell;
#[derive(Debug)]
pub struct Session {
pub(in super::super) id: SessionId,
pub(in super::super) error: Option<SessionError>,
pub(in super::super) tasks: RcSet<TaskRef>,
pub(in super::super) objects: RcSet<DataObjectRef>,
pub(in super::super) client: ClientRef,
pub(in super::super) unfinished_tasks: usize,
pub(in super::super) finish_hooks: Vec<FinishHook>,
pub(in super::super) spec: SessionSpec,
}
pub type SessionRef = WrappedRcRefCell<Session>;
impl Session {
pub fn get_error(&self) -> &Option<SessionError> {
&self.error
}
#[inline]
pub fn is_failed(&self) -> bool {
self.error.is_some()
}
}
impl Session {
pub fn wait(&mut self) -> Receiver<()> {
let (sender, receiver) = ::futures::unsync::oneshot::channel();
if self.unfinished_tasks == 0 {
sender.send(()).unwrap();
} else {
self.finish_hooks.push(sender);
}
receiver
}
pub fn task_finished(&mut self) {
assert!(self.unfinished_tasks > 0);
self.unfinished_tasks -= 1;
if self.unfinished_tasks == 0 {
for sender in ::std::mem::replace(&mut self.finish_hooks, Vec::new()) {
sender.send(()).unwrap();
}
}
}
}
impl SessionRef {
pub fn new(id: SessionId, client: &ClientRef, spec: SessionSpec) -> Self {
let s = SessionRef::wrap(Session {
id: id,
spec,
tasks: Default::default(),
objects: Default::default(),
client: client.clone(),
unfinished_tasks: 0,
finish_hooks: Default::default(),
error: None,
});
client.get_mut().sessions.insert(s.clone());
s
}
pub fn get_id(&self) -> SessionId {
self.get().id
}
pub fn unlink(&self) {
let mut inner = self.get_mut();
assert!(inner.objects.is_empty(), "Can only unlink empty session.");
assert!(inner.tasks.is_empty(), "Can only unlink empty session.");
assert!(inner.client.get_mut().sessions.remove(&self));
inner.finish_hooks.clear();
}
}
impl ConsistencyCheck for SessionRef {
fn check_consistency(&self) -> Result<()> {
let s = self.get();
for oref in s.objects.iter() {
if oref.get().session != *self {
bail!("session ref {:?} inconsistency in {:?}", oref, s)
}
}
for tref in s.tasks.iter() {
if tref.get().session != *self {
bail!("session ref {:?} inconsistency in {:?}", tref, s)
}
}
if !s.client.get().sessions.contains(self) {
bail!("owning client does not contain {:?}", s);
}
if !s.finish_hooks.is_empty()
&& s.tasks
.iter()
.all(|tr| tr.get().state == TaskState::Finished)
&& s.objects
.iter()
.all(|or| or.get().state != DataObjectState::Unfinished)
{
bail!("finish_hooks on all-finished session {:?}", s);
}
if s.error.is_some()
&& !(s.finish_hooks.is_empty() && s.tasks.is_empty() && s.objects.is_empty())
{
bail!("Session with error is not cleared: {:?}", s);
}
Ok(())
}
}
impl fmt::Debug for SessionRef {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SessionRef {}", self.get_id())
}
}