1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
use futures::unsync::oneshot;
pub use rain_core::common_capnp::DataObjectState;
use rain_core::{errors::*, types::*, utils::*};
use super::{GovernorRef, SessionRef, TaskRef, TaskState};
use wrapped::WrappedRcRefCell;
#[derive(Debug)]
pub struct DataObject {
pub(in super::super) spec: ObjectSpec,
pub(in super::super) info: ObjectInfo,
/// Producer task, if any.
pub(in super::super) producer: Option<TaskRef>,
/// Current state.
pub(in super::super) state: DataObjectState,
/// Consumer set, e.g. to notify of completion.
pub(in super::super) consumers: RcSet<TaskRef>,
/// Consumer set, e.g. to notify of completion.
pub(in super::super) need_by: RcSet<TaskRef>,
/// Governors scheduled to have a full copy of this object.
pub(in super::super) scheduled: RcSet<GovernorRef>,
/// Governors that have been instructed to pull this object or already have it.
/// Superset of `located`.
pub(in super::super) assigned: RcSet<GovernorRef>,
/// Governors with full copy of this object.
pub(in super::super) located: RcSet<GovernorRef>,
/// Assigned session. Must match SessionId.
pub(in super::super) session: SessionRef,
/// The object is requested to be kept by the client.
pub(in super::super) client_keep: bool,
/// Hooks executed when the task is finished
pub(in super::super) finish_hooks: Vec<FinishHook>,
/// Optinal *final* data when submitted from client or downloaded
/// by the server (for any reason thinkable).
pub(in super::super) data: Option<Vec<u8>>,
}
impl DataObject {
/// To capnp for governor message
/// It does not fill `placement` and `assigned`, that must be done by caller
pub fn to_governor_capnp(
&self,
builder: &mut ::rain_core::governor_capnp::data_object::Builder,
) {
builder.set_state(self.state);
builder.set_spec(&::serde_json::to_string(&self.spec).unwrap());
}
/// Inform observers that task is finished
pub fn trigger_finish_hooks(&mut self) {
debug!("trigger_finish_hooks for {:?}", self);
for sender in ::std::mem::replace(&mut self.finish_hooks, Vec::new()) {
// for sender in self.finish_hooks, Vec::new()) {
match sender.send(()) {
Ok(()) => { /* Do nothing */ }
Err(e) => {
/* Just log error, it is non fatal */
debug!("Failed to inform about finishing dataobject: {:?}", e);
}
}
}
assert!(self.finish_hooks.is_empty());
}
/// Wait until the given dataobject is finished
pub fn wait(&mut self) -> oneshot::Receiver<()> {
let (sender, receiver) = oneshot::channel();
match self.state {
DataObjectState::Finished => sender.send(()).unwrap(),
DataObjectState::Removed => panic!("waiting on Removed object"),
_ => self.finish_hooks.push(sender),
};
receiver
}
/// Is the Finished object data still needed by client (keep flag) or future tasks?
/// Scheduling is not accounted here.
/// Asserts the object is finished.
#[inline]
pub fn is_needed(&self) -> bool {
self.client_keep || !self.need_by.is_empty()
}
#[inline]
pub fn id(&self) -> DataObjectId {
self.spec.id
}
}
pub type DataObjectRef = WrappedRcRefCell<DataObject>;
impl DataObjectRef {
/// Create new data object and link it to the owning session.
pub fn new(
session: &SessionRef,
spec: ObjectSpec,
client_keep: bool,
data: Option<Vec<u8>>,
) -> Self {
assert_eq!(spec.id.get_session_id(), session.get_id());
let size = data.as_ref().map(|d| d.len());
let obj = DataObjectRef::wrap(DataObject {
spec: spec,
info: Default::default(),
producer: Default::default(),
state: if data.is_none() {
DataObjectState::Unfinished
} else {
DataObjectState::Finished
},
consumers: Default::default(),
need_by: Default::default(),
scheduled: Default::default(),
located: Default::default(),
assigned: Default::default(),
session: session.clone(),
client_keep: client_keep,
finish_hooks: Vec::new(),
data: data,
});
if size.is_some() {
obj.get_mut().info.size = size;
}
// add to session
session.get_mut().objects.insert(obj.clone());
obj
}
pub fn unschedule(&self) {
let mut inner = self.get_mut();
for w in &inner.scheduled {
w.get_mut().scheduled_objects.remove(&self);
}
inner.scheduled.clear();
}
/// Check that no compulsory links exist and remove from owner.
/// Clears (and fails) any finish_hooks. Leaves the unlinked object in in consistent state.
pub fn unlink(&self) {
self.unschedule();
let mut inner = self.get_mut();
assert!(
inner.assigned.is_empty(),
"Can only remove non-assigned objects."
);
assert!(
inner.located.is_empty(),
"Can only remove non-located objects."
);
assert!(
inner.consumers.is_empty(),
"Can only remove objects without consumers."
);
assert!(
inner.producer.is_none(),
"Can only remove objects without a producer."
);
// remove from owner
assert!(inner.session.get_mut().objects.remove(&self));
// clear finish_hooks
inner.finish_hooks.clear();
}
}
impl ConsistencyCheck for DataObjectRef {
/// Check for state and relationships consistency. Only explores adjacent objects but still
/// may be slow.
fn check_consistency(&self) -> Result<()> {
let s = self.get();
// ID consistency
if s.spec.id.get_session_id() != s.session.get_id() {
bail!("ID and Session ID mismatch in {:?}", s);
}
// reference symmetries
for wr in s.assigned.iter() {
if !wr.get().assigned_objects.contains(self) {
bail!("assigned asymmetry in {:?}", s);
}
}
for wr in s.scheduled.iter() {
if !wr.get().scheduled_objects.contains(self) {
bail!("scheduled asymmetry in {:?}", s);
}
}
for wr in s.located.iter() {
if !wr.get().located_objects.contains(self) {
bail!("located asymmetry in {:?}", s);
}
if !s.assigned.contains(wr) {
bail!("located at not-assigned governor in {:?}", s);
}
}
if !s.session.get().objects.contains(self) {
bail!("session assymetry in {:?}", s);
}
if let Some(ref tr) = s.producer {
if !tr.get().outputs.contains(self) {
bail!("object missing in producer {:?} outputs in {:?}", tr, s);
}
}
// producer consistency
if let Some(ref pr) = s.producer {
let p = pr.get();
if s.state == DataObjectState::Unfinished && p.state == TaskState::Finished {
bail!("producer finished state inconsistency in {:?}", s);
}
if s.state == DataObjectState::Finished && p.state != TaskState::Finished {
bail!("producer not finished state inconsistency in {:?}", s);
}
// Not relevant anyomre:
/* if let Some(ref swr) = p.scheduled {
if !s.scheduled.contains(swr) {
bail!("not scheduled to producer governor in {:?}");
}
}
*/ if let Some(ref swr) = p.assigned {
if !s.assigned.contains(swr) {
bail!("not assigned to producer governor in {:?}");
}
}
} else {
/* When session is cleared, the following invariant is not true
if s.state == DataObjectState::Finished {
if s.data.is_none() {
bail!("no data present for object without producer in {:?}", s);
}
}*/
}
// state consistency
if !match s.state {
DataObjectState::Unfinished => s.scheduled.len() <= 1 && s.assigned.len() <= 1,
// NOTE: Can't check s.producer.is_some() in case the session is being destroyed,
DataObjectState::Finished => {
s.data.is_some() || (s.located.len() >= 1 && s.assigned.len() >= 1)
}
DataObjectState::Removed => {
s.located.is_empty() && s.scheduled.is_empty() && s.assigned.is_empty()
&& s.finish_hooks.is_empty()
} /* && Why this?? s.size.is_some()*/
/* This is not true when session failed && s.data.is_none()*/
} {
bail!("state inconsistency in {:?}", s);
}
// data consistency [DISABLED BECAUSE OF SCHEDULER TESTING MODE DOES NOT HOLD THIS, see testmode.rs]
/*if let Some(ref d) = s.data {
let size = s.info.size;
if size != Some(d.len()) {
bail!("Info size and data len mismatch in {:?} ({:?} vs {:?})", s, size, Some(d.len()));
}
}*/
// finish hooks
if !s.finish_hooks.is_empty() && s.state != DataObjectState::Unfinished {
bail!("finish hooks for finished/removed object in {:?}", s);
}
// keepflag and empty assigned (via Removed state)
// NOTE: Finished state already requires nonemplty locations
if s.client_keep && s.state == DataObjectState::Removed {
bail!("client_keep flag on removed object {:?}", s);
}
// used or kept objects must be assigned when their producers are
if (s.client_keep || !s.consumers.is_empty()) && s.assigned.is_empty()
&& s.state == DataObjectState::Unfinished
{
if let Some(ref prod) = s.producer {
let p = prod.get();
if p.state == TaskState::Assigned || p.state == TaskState::Running {
bail!(
"Unfinished object is not assigned when it's producer task is in {:?}",
s
);
}
}
}
Ok(())
}
}
impl ::std::fmt::Debug for DataObjectRef {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(f, "DataObjectRef {}", self.get().spec.id)
}
}