differential_dataflow/trace/
description.rs

1//! Descriptions of intervals of partially ordered times.
2//!
3//! A description provides what intends to be an unambiguous characterization of a batch of
4//! updates. We do assume that these updates are in the context of a known computation and
5//! known input, so there is a well-defined "correct answer" for the full set of updates.
6//!
7//! ```ignore
8//!      full = { (data, time, diff) }
9//! ```
10//!
11//! Our aim with a description is to specify a subset of these updates unambiguously.
12//!
13//! Each description contains three frontiers, sets of mutually incomparable partially ordered
14//! times. The first two frontiers are `lower` and `upper`, and they indicate the subset of
15//! `full` represented in the batch: those updates whose times are greater or equal to some
16//! element of `lower` but not greater or equal to any element of `upper`.
17//!
18//! ```ignore
19//!     subset = { (data, time, diff) in full | lower.any(|t| t.le(time)) &&
20//!                                            !upper.any(|t| t.le(time)) }
21//! ```
22//!
23//! The third frontier `since` is used to indicate that the times presented by the batch may
24//! no longer reflect the values in `subset` above. Although the updates are precisely those
25//! bound by `lower` and `upper`, we may have *advanced* some of the times.
26//!
27//! The guarantee provided by a batch is that for any time greater or equal to some element of
28//! `since`, the accumulated weight of batch updates before that time is identical to the accumulated
29//! weights of updates from `full` at times greater or equal to an element of `lower`, greater
30//! or equal to no element of `upper`, and less or equal to the query time.
31//!
32//! ```ignore
33//!     for all times t1:
34//!
35//!        if since.any(|t2| t2.less_equal(t1)) then:
36//!
37//!            for all data:
38//!
39//!                sum x where (data, t2, x) in batch and t2.less_equal(t1)
40//!            ==
41//!                sum x where (data, t2, x) in full and  t2.less_equal(t1)
42//!                                                  and  lower.any(|t3| t3.less_equal(t2))
43//!                                                  and !upper.any(|t3| t3.less_equal(t2))
44//! ```
45//!
46//! Very importantly, this equality does not make any other guarantees about the contents of
47//! the batch when one iterates through it. There are some consequences of the math that can
48//! be relied upon, though.
49//!
50//! The most important consequence is that when `since <= lower` the contents of the batch
51//! must be identical to the updates in `subset`. If it is ever the case that `since` is
52//! in advance of `lower`, consumers of the batch must take care that they not use the times
53//! observed in the batch without ensuring that they are appropriately advanced (typically by
54//! `since`). Failing to do so may produce updates that are not in advance of `since`, which
55//! will often be a logic bug, as `since` does not advance without a corresponding advance in
56//! times at which data may possibly be sent.
57
58use abomonation_derive::Abomonation;
59use timely::{PartialOrder, progress::Antichain};
60use serde::{Serialize, Deserialize};
61
62/// Describes an interval of partially ordered times.
63///
64/// A `Description` indicates a set of partially ordered times, and a moment at which they are
65/// observed. The `lower` and `upper` frontiers bound the times contained within, and the `since`
66/// frontier indicates a moment at which the times were observed. If `since` is strictly in
67/// advance of `lower`, the contained times may be "advanced" to times which appear equivalent to
68/// any time after `since`.
69#[derive(Clone, Debug, Abomonation, Serialize, Deserialize)]
70pub struct Description<Time> {
71    /// lower frontier of contained updates.
72    lower: Antichain<Time>,
73    /// upper frontier of contained updates.
74    upper: Antichain<Time>,
75    /// frontier used for update compaction.
76    since: Antichain<Time>,
77}
78
79impl<Time: PartialOrder+Clone> Description<Time> {
80    /// Returns a new description from its component parts.
81    pub fn new(lower: Antichain<Time>, upper: Antichain<Time>, since: Antichain<Time>) -> Self {
82        assert!(!lower.elements().is_empty());    // this should always be true.
83        // assert!(upper.len() > 0);            // this may not always be true.
84        Description {
85            lower,
86            upper,
87            since,
88        }
89    }
90}
91
92impl<Time> Description<Time> {
93    /// The lower envelope for times in the interval.
94    pub fn lower(&self) -> &Antichain<Time> { &self.lower }
95    /// The upper envelope for times in the interval.
96    pub fn upper(&self) -> &Antichain<Time> { &self.upper }
97    /// Times from whose future the interval may be observed.
98    pub fn since(&self) -> &Antichain<Time> { &self.since }
99}
100
101impl<Time: PartialEq> PartialEq for Description<Time> {
102    fn eq(&self, other: &Self) -> bool {
103        self.lower.eq(other.lower())
104            && self.upper.eq(other.upper())
105            && self.since.eq(other.since())
106    }
107}
108
109impl<Time: Eq> Eq for Description<Time> {}