1use std::collections::HashMap;
5
6use timely::dataflow::operators::{Probe, UnorderedInput};
7use timely::dataflow::{ProbeHandle, Scope, ScopeParent, Stream};
8use timely::progress::frontier::AntichainRef;
9use timely::progress::Timestamp;
10
11use differential_dataflow::lattice::Lattice;
12use differential_dataflow::operators::arrange::Arrange;
13use differential_dataflow::operators::Threshold;
14use differential_dataflow::trace::TraceReader;
15use differential_dataflow::AsCollection;
16
17use crate::operators::CardinalityOne;
18use crate::{Aid, Error, Rewind, TxData, Value};
19use crate::{AttributeConfig, IndexDirection, InputSemantics, QuerySupport};
20use crate::{RelationConfig, RelationHandle};
21use crate::{TraceKeyHandle, TraceValHandle};
22
23mod unordered_session;
24use unordered_session::UnorderedSession;
25
26pub struct Domain<T: Timestamp + Lattice> {
48 now_at: T,
50 last_advance: Vec<T>,
52 input_sessions: HashMap<String, UnorderedSession<T, (Value, Value), isize>>,
54 domain_probe: ProbeHandle<T>,
56 probed_source_count: usize,
60 pub attributes: HashMap<Aid, AttributeConfig>,
62 pub forward_count: HashMap<Aid, TraceKeyHandle<Value, T, isize>>,
64 pub forward_propose: HashMap<Aid, TraceValHandle<Value, Value, T, isize>>,
66 pub forward_validate: HashMap<Aid, TraceKeyHandle<(Value, Value), T, isize>>,
68 pub reverse_count: HashMap<Aid, TraceKeyHandle<Value, T, isize>>,
70 pub reverse_propose: HashMap<Aid, TraceValHandle<Value, Value, T, isize>>,
72 pub reverse_validate: HashMap<Aid, TraceKeyHandle<(Value, Value), T, isize>>,
74 pub relations: HashMap<Aid, RelationConfig>,
76 pub arrangements: HashMap<Aid, RelationHandle<T>>,
78}
79
80impl<T> Domain<T>
81where
82 T: Timestamp + Lattice + Rewind,
83{
84 pub fn new(start_at: T) -> Self {
86 Domain {
87 now_at: start_at,
88 last_advance: vec![<T as Lattice>::minimum()],
89 input_sessions: HashMap::new(),
90 domain_probe: ProbeHandle::new(),
91 probed_source_count: 0,
92 attributes: HashMap::new(),
93 forward_count: HashMap::new(),
94 forward_propose: HashMap::new(),
95 forward_validate: HashMap::new(),
96 reverse_count: HashMap::new(),
97 reverse_propose: HashMap::new(),
98 reverse_validate: HashMap::new(),
99 relations: HashMap::new(),
100 arrangements: HashMap::new(),
101 }
102 }
103
104 fn create_attribute<S: Scope + ScopeParent<Timestamp = T>>(
108 &mut self,
109 name: &str,
110 config: AttributeConfig,
111 pairs: &Stream<S, ((Value, Value), T, isize)>,
112 ) -> Result<(), Error> {
113 if self.attributes.contains_key(name) {
114 Err(Error::conflict(format!(
115 "An attribute of name {} already exists.",
116 name
117 )))
118 } else {
119 let tuples = match config.input_semantics {
120 InputSemantics::Raw => pairs.as_collection(),
121 InputSemantics::CardinalityOne => pairs.as_collection().cardinality_one(),
122 InputSemantics::CardinalityMany => pairs.as_collection().distinct(),
125 };
126
127 let tuples_reverse = tuples.map(|(e, v)| (v, e));
129
130 self.forward_propose.insert(
133 name.to_string(),
134 tuples.arrange_named(&format!("->Propose({})", &name)).trace,
135 );
136
137 if config.index_direction == IndexDirection::Both {
138 self.reverse_propose.insert(
139 name.to_string(),
140 tuples_reverse
141 .arrange_named(&format!("->_Propose({})", &name))
142 .trace,
143 );
144 }
145
146 if config.input_semantics != InputSemantics::CardinalityOne {
149 if config.query_support == QuerySupport::AdaptiveWCO {
152 self.forward_count.insert(
153 name.to_string(),
154 tuples
155 .map(|(k, _v)| (k, ()))
156 .arrange_named(&format!("->Count({})", name))
157 .trace,
158 );
159
160 if config.index_direction == IndexDirection::Both {
161 self.reverse_count.insert(
162 name.to_string(),
163 tuples_reverse
164 .map(|(k, _v)| (k, ()))
165 .arrange_named(&format!("->_Count({})", name))
166 .trace,
167 );
168 }
169 }
170
171 if config.query_support >= QuerySupport::Delta {
172 self.forward_validate.insert(
173 name.to_string(),
174 tuples
175 .map(|t| (t, ()))
176 .arrange_named(&format!("->Validate({})", &name))
177 .trace,
178 );
179
180 if config.index_direction == IndexDirection::Both {
181 self.reverse_validate.insert(
182 name.to_string(),
183 tuples_reverse
184 .map(|t| (t, ()))
185 .arrange_named(&format!("->_Validate({})", &name))
186 .trace,
187 );
188 }
189 }
190 }
191
192 self.attributes.insert(name.to_string(), config);
196
197 info!("Created attribute {}", name);
198
199 Ok(())
200 }
201 }
202
203 pub fn create_transactable_attribute<S: Scope<Timestamp = T>>(
205 &mut self,
206 name: &str,
207 config: AttributeConfig,
208 scope: &mut S,
209 ) -> Result<(), Error> {
210 let pairs = {
211 let ((handle, cap), pairs) = scope.new_unordered_input::<((Value, Value), T, isize)>();
212 let session = UnorderedSession::from(handle, cap);
213
214 self.input_sessions.insert(name.to_string(), session);
215
216 pairs
217 };
218
219 self.create_attribute(name, config, &pairs)?;
222
223 Ok(())
224 }
225
226 pub fn create_sourced_attribute<S: Scope + ScopeParent<Timestamp = T>>(
229 &mut self,
230 name: &str,
231 config: AttributeConfig,
232 pairs: &Stream<S, ((Value, Value), T, isize)>,
233 ) -> Result<(), Error> {
234 let source_pairs = if config.timeless {
242 pairs.to_owned()
243 } else {
244 self.probed_source_count += 1;
245 pairs.probe_with(&mut self.domain_probe)
246 };
247
248 self.create_attribute(name, config, &source_pairs)?;
249
250 Ok(())
251 }
252
253 pub fn register_arrangement(
255 &mut self,
256 name: String,
257 config: RelationConfig,
258 trace: RelationHandle<T>,
259 ) {
260 self.relations.insert(name.clone(), config);
261 self.arrangements.insert(name, trace);
262 }
263
264 pub fn transact(&mut self, tx_data: Vec<TxData>) -> Result<(), Error> {
266 for TxData(op, e, a, v, t) in tx_data {
268 match self.input_sessions.get_mut(&a) {
269 None => {
270 return Err(Error::not_found(format!("Attribute {} does not exist.", a)));
271 }
272 Some(handle) => match t {
273 None => handle.update((e, v), op),
274 Some(t) => handle.update_at((e, v), t.into(), op),
275 },
276 }
277 }
278
279 Ok(())
280 }
281
282 pub fn close_input(&mut self, name: String) -> Result<(), Error> {
284 match self.input_sessions.remove(&name) {
285 None => Err(Error::not_found(format!("Input {} does not exist.", name))),
286 Some(handle) => {
287 handle.close();
288 Ok(())
289 }
290 }
291 }
292
293 pub fn advance(&mut self) -> Result<(), Error> {
297 if self.probed_source_count() == 0 {
298 self.advance_traces(&[self.epoch().clone()])
300 } else {
301 let frontier = self
302 .domain_probe
303 .with_frontier(|frontier| (*frontier).to_vec());
304
305 if frontier.is_empty() {
306 self.advance_traces(&[self.epoch().clone()])
312 } else {
313 if !AntichainRef::new(&frontier).less_equal(self.epoch()) {
314 let max = frontier.iter().max().unwrap().clone();
318 self.advance_epoch(max)?;
319 }
320
321 self.advance_traces(&frontier)
322 }
323 }
324 }
325
326 pub fn advance_epoch(&mut self, next: T) -> Result<(), Error> {
330 if !self.now_at.less_equal(&next) {
331 Err(Error::conflict(format!(
333 "Domain is at {:?}, you attempted to rewind to {:?}.",
334 &self.now_at, &next
335 )))
336 } else if !self.now_at.eq(&next) {
337 trace!("Advancing domain epoch to {:?} ", next);
338
339 for handle in self.input_sessions.values_mut() {
340 handle.advance_to(next.clone());
341 handle.flush();
342 }
343 self.now_at = next;
344
345 Ok(())
346 } else {
347 Ok(())
348 }
349 }
350
351 pub fn advance_traces(&mut self, frontier: &[T]) -> Result<(), Error> {
354 let last_advance = AntichainRef::new(&self.last_advance);
355
356 if frontier.iter().any(|t| last_advance.less_than(t)) {
357 trace!("Advancing traces to {:?}", frontier);
358
359 self.last_advance = frontier.to_vec();
360 let frontier = AntichainRef::new(frontier);
361
362 for (aid, config) in self.attributes.iter() {
363 if let Some(ref trace_slack) = config.trace_slack {
364 let slacking_frontier = frontier
365 .iter()
366 .map(|t| t.rewind(trace_slack.clone().into()))
367 .collect::<Vec<T>>();;
368
369 if let Some(trace) = self.forward_count.get_mut(aid) {
370 trace.advance_by(&slacking_frontier);
371 trace.distinguish_since(&slacking_frontier);
372 }
373
374 if let Some(trace) = self.forward_propose.get_mut(aid) {
375 trace.advance_by(&slacking_frontier);
376 trace.distinguish_since(&slacking_frontier);
377 }
378
379 if let Some(trace) = self.forward_validate.get_mut(aid) {
380 trace.advance_by(&slacking_frontier);
381 trace.distinguish_since(&slacking_frontier);
382 }
383
384 if let Some(trace) = self.reverse_count.get_mut(aid) {
385 trace.advance_by(&slacking_frontier);
386 trace.distinguish_since(&slacking_frontier);
387 }
388
389 if let Some(trace) = self.reverse_propose.get_mut(aid) {
390 trace.advance_by(&slacking_frontier);
391 trace.distinguish_since(&slacking_frontier);
392 }
393
394 if let Some(trace) = self.reverse_validate.get_mut(aid) {
395 trace.advance_by(&slacking_frontier);
396 trace.distinguish_since(&slacking_frontier);
397 }
398 }
399 }
400
401 for (name, config) in self.relations.iter() {
402 if let Some(ref trace_slack) = config.trace_slack {
403 let slacking_frontier = frontier
404 .iter()
405 .map(|t| t.rewind(trace_slack.clone().into()))
406 .collect::<Vec<T>>();
407
408 let trace = self.arrangements.get_mut(name).unwrap_or_else(|| {
409 panic!("Configuration available for unknown relation {}", name)
410 });
411
412 trace.advance_by(&slacking_frontier);
413 trace.distinguish_since(&slacking_frontier);
414 }
415 }
416 }
417
418 Ok(())
419 }
420
421 pub fn domain_probe(&self) -> &ProbeHandle<T> {
423 &self.domain_probe
424 }
425
426 pub fn epoch(&self) -> &T {
428 &self.now_at
429 }
430
431 pub fn probed_source_count(&self) -> usize {
433 self.probed_source_count
434 }
435
436 pub fn dominates(&self, frontier: AntichainRef<T>) -> bool {
438 if self.probed_source_count() == 0 {
444 frontier.less_than(self.epoch())
445 } else if frontier.is_empty() {
446 false
447 } else {
448 self.domain_probe().with_frontier(|domain_frontier| {
449 domain_frontier.iter().all(|t| frontier.less_than(t))
450 })
451 }
452 }
453}