declarative_dataflow/domain/
mod.rs

1//! Logic for working with attributes under a shared timestamp
2//! semantics.
3
4use std::collections::HashMap;
5
6use timely::dataflow::operators::{Probe, UnorderedInput};
7use timely::dataflow::{ProbeHandle, Scope, ScopeParent, Stream};
8use timely::progress::frontier::AntichainRef;
9use timely::progress::Timestamp;
10
11use differential_dataflow::lattice::Lattice;
12use differential_dataflow::operators::arrange::Arrange;
13use differential_dataflow::operators::Threshold;
14use differential_dataflow::trace::TraceReader;
15use differential_dataflow::AsCollection;
16
17use crate::operators::CardinalityOne;
18use crate::{Aid, Error, Rewind, TxData, Value};
19use crate::{AttributeConfig, IndexDirection, InputSemantics, QuerySupport};
20use crate::{RelationConfig, RelationHandle};
21use crate::{TraceKeyHandle, TraceValHandle};
22
23mod unordered_session;
24use unordered_session::UnorderedSession;
25
26/// A domain manages attributes that share a timestamp semantics. Each
27/// attribute within a domain can be either fed from an external
28/// system, or from user transactions. The former are referred to as
29/// *sourced*, the latter as *transactable* attributes.
30///
31/// Both types of input must make sure not to block overall domain
32/// progress, s.t. results can be revealed and traces can be
33/// compacted. For attributes with an opinion on time, users and
34/// source operators are required to regularly downgrade their
35/// capabilities. As they do so, the domain frontier advances.
36///
37/// Some attributes do not care about time. Such attributes want their
38/// information to be immediately available to all
39/// queries. Conceptually, they want all their inputs to happen at
40/// t0. This is however not a practical solution, because holding
41/// capabilities for t0 in perpetuity completely stalls monotemporal
42/// domains and prevents trace compaction in multitemporal ones. We
43/// refer to this type of attributes as *timeless*. Instead, timeless
44/// attributes must be automatically advanced in lockstep with a
45/// high-watermark of all timeful domain inputs. This ensures that
46/// they will never block overall progress.
47pub struct Domain<T: Timestamp + Lattice> {
48    /// The current input epoch.
49    now_at: T,
50    /// Last trace advance.
51    last_advance: Vec<T>,
52    /// Input handles to attributes in this domain.
53    input_sessions: HashMap<String, UnorderedSession<T, (Value, Value), isize>>,
54    /// The probe keeping track of source progress in this domain.
55    domain_probe: ProbeHandle<T>,
56    /// Maintaining the number of probed sources allows us to
57    /// distinguish between a domain without sources, and one where
58    /// sources have ceased producing inputs.
59    probed_source_count: usize,
60    /// Configurations for attributes in this domain.
61    pub attributes: HashMap<Aid, AttributeConfig>,
62    /// Forward count traces.
63    pub forward_count: HashMap<Aid, TraceKeyHandle<Value, T, isize>>,
64    /// Forward propose traces.
65    pub forward_propose: HashMap<Aid, TraceValHandle<Value, Value, T, isize>>,
66    /// Forward validate traces.
67    pub forward_validate: HashMap<Aid, TraceKeyHandle<(Value, Value), T, isize>>,
68    /// Reverse count traces.
69    pub reverse_count: HashMap<Aid, TraceKeyHandle<Value, T, isize>>,
70    /// Reverse propose traces.
71    pub reverse_propose: HashMap<Aid, TraceValHandle<Value, Value, T, isize>>,
72    /// Reverse validate traces.
73    pub reverse_validate: HashMap<Aid, TraceKeyHandle<(Value, Value), T, isize>>,
74    /// Configuration for relations in this domain.
75    pub relations: HashMap<Aid, RelationConfig>,
76    /// Relation traces.
77    pub arrangements: HashMap<Aid, RelationHandle<T>>,
78}
79
80impl<T> Domain<T>
81where
82    T: Timestamp + Lattice + Rewind,
83{
84    /// Creates a new domain.
85    pub fn new(start_at: T) -> Self {
86        Domain {
87            now_at: start_at,
88            last_advance: vec![<T as Lattice>::minimum()],
89            input_sessions: HashMap::new(),
90            domain_probe: ProbeHandle::new(),
91            probed_source_count: 0,
92            attributes: HashMap::new(),
93            forward_count: HashMap::new(),
94            forward_propose: HashMap::new(),
95            forward_validate: HashMap::new(),
96            reverse_count: HashMap::new(),
97            reverse_propose: HashMap::new(),
98            reverse_validate: HashMap::new(),
99            relations: HashMap::new(),
100            arrangements: HashMap::new(),
101        }
102    }
103
104    /// Creates an attribute from a stream of (key,value)
105    /// pairs. Applies operators to enforce input semantics, registers
106    /// the attribute configuration, and installs appropriate indices.
107    fn create_attribute<S: Scope + ScopeParent<Timestamp = T>>(
108        &mut self,
109        name: &str,
110        config: AttributeConfig,
111        pairs: &Stream<S, ((Value, Value), T, isize)>,
112    ) -> Result<(), Error> {
113        if self.attributes.contains_key(name) {
114            Err(Error::conflict(format!(
115                "An attribute of name {} already exists.",
116                name
117            )))
118        } else {
119            let tuples = match config.input_semantics {
120                InputSemantics::Raw => pairs.as_collection(),
121                InputSemantics::CardinalityOne => pairs.as_collection().cardinality_one(),
122                // Ensure that redundant (e,v) pairs don't cause
123                // misleading proposals during joining.
124                InputSemantics::CardinalityMany => pairs.as_collection().distinct(),
125            };
126
127            // @TODO should only create this if used later
128            let tuples_reverse = tuples.map(|(e, v)| (v, e));
129
130            // Propose traces are used in general, whereas the other
131            // indices are only relevant to Hector.
132            self.forward_propose.insert(
133                name.to_string(),
134                tuples.arrange_named(&format!("->Propose({})", &name)).trace,
135            );
136
137            if config.index_direction == IndexDirection::Both {
138                self.reverse_propose.insert(
139                    name.to_string(),
140                    tuples_reverse
141                        .arrange_named(&format!("->_Propose({})", &name))
142                        .trace,
143                );
144            }
145
146            // CardinalityOne is a special case, because count,
147            // propose, and validate are all essentially the same.
148            if config.input_semantics != InputSemantics::CardinalityOne {
149                // Count traces are only required for use in
150                // worst-case optimal joins.
151                if config.query_support == QuerySupport::AdaptiveWCO {
152                    self.forward_count.insert(
153                        name.to_string(),
154                        tuples
155                            .map(|(k, _v)| (k, ()))
156                            .arrange_named(&format!("->Count({})", name))
157                            .trace,
158                    );
159
160                    if config.index_direction == IndexDirection::Both {
161                        self.reverse_count.insert(
162                            name.to_string(),
163                            tuples_reverse
164                                .map(|(k, _v)| (k, ()))
165                                .arrange_named(&format!("->_Count({})", name))
166                                .trace,
167                        );
168                    }
169                }
170
171                if config.query_support >= QuerySupport::Delta {
172                    self.forward_validate.insert(
173                        name.to_string(),
174                        tuples
175                            .map(|t| (t, ()))
176                            .arrange_named(&format!("->Validate({})", &name))
177                            .trace,
178                    );
179
180                    if config.index_direction == IndexDirection::Both {
181                        self.reverse_validate.insert(
182                            name.to_string(),
183                            tuples_reverse
184                                .map(|t| (t, ()))
185                                .arrange_named(&format!("->_Validate({})", &name))
186                                .trace,
187                        );
188                    }
189                }
190            }
191
192            // This is crucial. If we forget to install the attribute
193            // configuration, its traces will be ignored when
194            // advancing the domain.
195            self.attributes.insert(name.to_string(), config);
196
197            info!("Created attribute {}", name);
198
199            Ok(())
200        }
201    }
202
203    /// Creates an attribute that can be transacted upon by clients.
204    pub fn create_transactable_attribute<S: Scope<Timestamp = T>>(
205        &mut self,
206        name: &str,
207        config: AttributeConfig,
208        scope: &mut S,
209    ) -> Result<(), Error> {
210        let pairs = {
211            let ((handle, cap), pairs) = scope.new_unordered_input::<((Value, Value), T, isize)>();
212            let session = UnorderedSession::from(handle, cap);
213
214            self.input_sessions.insert(name.to_string(), session);
215
216            pairs
217        };
218
219        // We do not want to probe transactable attributes, because
220        // the domain epoch is authoritative for them.
221        self.create_attribute(name, config, &pairs)?;
222
223        Ok(())
224    }
225
226    /// Creates an attribute that is controlled by a source and thus
227    /// can not be transacted upon by clients.
228    pub fn create_sourced_attribute<S: Scope + ScopeParent<Timestamp = T>>(
229        &mut self,
230        name: &str,
231        config: AttributeConfig,
232        pairs: &Stream<S, ((Value, Value), T, isize)>,
233    ) -> Result<(), Error> {
234        // We need to install a probe on source-fed attributes in
235        // order to determine their progress.
236
237        // We do not want to probe timeless attributes.
238        // Sources of timeless attributes either are not able to or do not
239        // want to provide valid domain timestamps.
240        // Forcing to probe them would stall progress in the system.
241        let source_pairs = if config.timeless {
242            pairs.to_owned()
243        } else {
244            self.probed_source_count += 1;
245            pairs.probe_with(&mut self.domain_probe)
246        };
247
248        self.create_attribute(name, config, &source_pairs)?;
249
250        Ok(())
251    }
252
253    /// Inserts a new named relation.
254    pub fn register_arrangement(
255        &mut self,
256        name: String,
257        config: RelationConfig,
258        trace: RelationHandle<T>,
259    ) {
260        self.relations.insert(name.clone(), config);
261        self.arrangements.insert(name, trace);
262    }
263
264    /// Transact data into one or more inputs.
265    pub fn transact(&mut self, tx_data: Vec<TxData>) -> Result<(), Error> {
266        // @TODO do this smarter, e.g. grouped by handle
267        for TxData(op, e, a, v, t) in tx_data {
268            match self.input_sessions.get_mut(&a) {
269                None => {
270                    return Err(Error::not_found(format!("Attribute {} does not exist.", a)));
271                }
272                Some(handle) => match t {
273                    None => handle.update((e, v), op),
274                    Some(t) => handle.update_at((e, v), t.into(), op),
275                },
276            }
277        }
278
279        Ok(())
280    }
281
282    /// Closes and drops an existing input.
283    pub fn close_input(&mut self, name: String) -> Result<(), Error> {
284        match self.input_sessions.remove(&name) {
285            None => Err(Error::not_found(format!("Input {} does not exist.", name))),
286            Some(handle) => {
287                handle.close();
288                Ok(())
289            }
290        }
291    }
292
293    /// Advances the domain to the current domain frontier, thus
294    /// allowing traces to compact. All domain input handles are
295    /// forwarded up to the frontier, so as not to stall progress.
296    pub fn advance(&mut self) -> Result<(), Error> {
297        if self.probed_source_count() == 0 {
298            // No sources registered.
299            self.advance_traces(&[self.epoch().clone()])
300        } else {
301            let frontier = self
302                .domain_probe
303                .with_frontier(|frontier| (*frontier).to_vec());
304
305            if frontier.is_empty() {
306                // Even if all sources dropped their capabilities we
307                // still want to advance all traces to the current
308                // epoch, s.t. user created attributes are
309                // continuously advanced and compacted.
310
311                self.advance_traces(&[self.epoch().clone()])
312            } else {
313                if !AntichainRef::new(&frontier).less_equal(self.epoch()) {
314                    // Input handles have fallen behind the sources and need
315                    // to be advanced, such as not to block progress.
316
317                    let max = frontier.iter().max().unwrap().clone();
318                    self.advance_epoch(max)?;
319                }
320
321                self.advance_traces(&frontier)
322            }
323        }
324    }
325
326    /// Advances the domain epoch. The domain epoch can be in advance
327    /// of or lag behind the domain frontier. It is used by timeless
328    /// attributes to avoid stalling timeful inputs.
329    pub fn advance_epoch(&mut self, next: T) -> Result<(), Error> {
330        if !self.now_at.less_equal(&next) {
331            // We can't rewind time.
332            Err(Error::conflict(format!(
333                "Domain is at {:?}, you attempted to rewind to {:?}.",
334                &self.now_at, &next
335            )))
336        } else if !self.now_at.eq(&next) {
337            trace!("Advancing domain epoch to {:?} ", next);
338
339            for handle in self.input_sessions.values_mut() {
340                handle.advance_to(next.clone());
341                handle.flush();
342            }
343            self.now_at = next;
344
345            Ok(())
346        } else {
347            Ok(())
348        }
349    }
350
351    /// Advances domain traces up to the specified frontier minus
352    /// their configured slack.
353    pub fn advance_traces(&mut self, frontier: &[T]) -> Result<(), Error> {
354        let last_advance = AntichainRef::new(&self.last_advance);
355
356        if frontier.iter().any(|t| last_advance.less_than(t)) {
357            trace!("Advancing traces to {:?}", frontier);
358
359            self.last_advance = frontier.to_vec();
360            let frontier = AntichainRef::new(frontier);
361
362            for (aid, config) in self.attributes.iter() {
363                if let Some(ref trace_slack) = config.trace_slack {
364                    let slacking_frontier = frontier
365                        .iter()
366                        .map(|t| t.rewind(trace_slack.clone().into()))
367                        .collect::<Vec<T>>();;
368
369                    if let Some(trace) = self.forward_count.get_mut(aid) {
370                        trace.advance_by(&slacking_frontier);
371                        trace.distinguish_since(&slacking_frontier);
372                    }
373
374                    if let Some(trace) = self.forward_propose.get_mut(aid) {
375                        trace.advance_by(&slacking_frontier);
376                        trace.distinguish_since(&slacking_frontier);
377                    }
378
379                    if let Some(trace) = self.forward_validate.get_mut(aid) {
380                        trace.advance_by(&slacking_frontier);
381                        trace.distinguish_since(&slacking_frontier);
382                    }
383
384                    if let Some(trace) = self.reverse_count.get_mut(aid) {
385                        trace.advance_by(&slacking_frontier);
386                        trace.distinguish_since(&slacking_frontier);
387                    }
388
389                    if let Some(trace) = self.reverse_propose.get_mut(aid) {
390                        trace.advance_by(&slacking_frontier);
391                        trace.distinguish_since(&slacking_frontier);
392                    }
393
394                    if let Some(trace) = self.reverse_validate.get_mut(aid) {
395                        trace.advance_by(&slacking_frontier);
396                        trace.distinguish_since(&slacking_frontier);
397                    }
398                }
399            }
400
401            for (name, config) in self.relations.iter() {
402                if let Some(ref trace_slack) = config.trace_slack {
403                    let slacking_frontier = frontier
404                        .iter()
405                        .map(|t| t.rewind(trace_slack.clone().into()))
406                        .collect::<Vec<T>>();
407
408                    let trace = self.arrangements.get_mut(name).unwrap_or_else(|| {
409                        panic!("Configuration available for unknown relation {}", name)
410                    });
411
412                    trace.advance_by(&slacking_frontier);
413                    trace.distinguish_since(&slacking_frontier);
414                }
415            }
416        }
417
418        Ok(())
419    }
420
421    /// Returns a handle to the domain's input probe.
422    pub fn domain_probe(&self) -> &ProbeHandle<T> {
423        &self.domain_probe
424    }
425
426    /// Reports the current input epoch.
427    pub fn epoch(&self) -> &T {
428        &self.now_at
429    }
430
431    /// Reports the number of probed (timeful) sources in the domain.
432    pub fn probed_source_count(&self) -> usize {
433        self.probed_source_count
434    }
435
436    /// Returns true iff the frontier dominates all domain inputs.
437    pub fn dominates(&self, frontier: AntichainRef<T>) -> bool {
438        // We must distinguish the scenario where the internal domain
439        // has no sources from one where all its internal sources have
440        // dropped their capabilities. We do this by checking the
441        // probed_source_count of the domain.
442
443        if self.probed_source_count() == 0 {
444            frontier.less_than(self.epoch())
445        } else if frontier.is_empty() {
446            false
447        } else {
448            self.domain_probe().with_frontier(|domain_frontier| {
449                domain_frontier.iter().all(|t| frontier.less_than(t))
450            })
451        }
452    }
453}