pub struct Variable<G, C>{ /* private fields */ }Expand description
A recursively defined collection.
The Variable struct allows differential dataflow programs requiring more sophisticated
iterative patterns than singly recursive iteration. For example: in mutual recursion two
collections evolve simultaneously.
§Examples
The following example is equivalent to the example for the Iterate trait.
use timely::order::Product;
use timely::dataflow::Scope;
use differential_dataflow::input::Input;
use differential_dataflow::operators::iterate::Variable;
::timely::example(|scope| {
let numbers = scope.new_collection_from(1 .. 10u32).1;
scope.iterative::<u64,_,_>(|nested| {
let summary = Product::new(Default::default(), 1);
let (variable, collection) = Variable::new_from(numbers.enter(nested), summary);
let result = collection.map(|x| if x % 2 == 0 { x/2 } else { x })
.consolidate();
variable.set(result.clone());
result.leave()
});
})Variables support iterative patterns that can be both more flexible, and more efficient.
Mutual recursion is when one defines multiple variables in the same iterative context, and their definitions are not independent. For example, odd numbers and even numbers can be determined from each other, iteratively.
use timely::order::Product;
use timely::dataflow::Scope;
use differential_dataflow::input::Input;
use differential_dataflow::operators::iterate::Variable;
::timely::example(|scope| {
let numbers = scope.new_collection_from(10 .. 20u32).1;
scope.iterative::<u64,_,_>(|nested| {
let summary = Product::new(Default::default(), 1);
let (even_v, even) = Variable::new_from(numbers.clone().enter(nested).filter(|x| x % 2 == 0), summary);
let (odds_v, odds) = Variable::new_from(numbers.clone().enter(nested).filter(|x| x % 2 == 1), summary);
odds_v.set(even.clone().filter(|x| *x > 0).map(|x| x-1).concat(odds.clone()).distinct());
even_v.set(odds.clone().filter(|x| *x > 0).map(|x| x-1).concat(even.clone()).distinct());
});
})Direct construction can be more efficient than iterate when you know a way to directly
determine the changes to make to the initial collection, rather than simply adding that
collection, running your intended logic, and then subtracting the collection.
An an example, the logic in identifiers.rs looks for hash collisions, and tweaks the salt
for all but one element in each group of collisions. Most elements do not collide, and we
we don’t need to circulate the non-colliding elements to confirm that they subtract away.
By iteratively developing a variable of the edits to the input, we can produce and circulate
a smaller volume of updates. This can be especially impactful when the initial collection is
large, and the edits to perform are relatively smaller.
Implementations§
Source§impl<G, C> Variable<G, C>
impl<G, C> Variable<G, C>
Sourcepub fn new(
scope: &mut G,
step: <G::Timestamp as Timestamp>::Summary,
) -> (Self, Collection<G, C>)
pub fn new( scope: &mut G, step: <G::Timestamp as Timestamp>::Summary, ) -> (Self, Collection<G, C>)
Creates a new initially empty Variable and its associated Collection.
The collection should be used, along with other potentially recursive collections,
to define a output collection to which the variable is then set.
In an iterative context, each collection starts empty and are repeatedly updated by
the logic used to produce the collection their variable is bound to. This process
continues until no changes occur, at which point we have reached a fixed point (or
the range of timestamps have been exhausted). Calling leave() on any collection
will produce its fixed point in the outer scope.
In a non-iterative scope the mechanics are the same, but the interpretation varies.
Sourcepub fn new_from(
source: Collection<G, C>,
step: <G::Timestamp as Timestamp>::Summary,
) -> (Self, Collection<G, C>)
pub fn new_from( source: Collection<G, C>, step: <G::Timestamp as Timestamp>::Summary, ) -> (Self, Collection<G, C>)
Creates a new Variable and its associated Collection, initially source.
This method is a short-cut for a pattern that one can write manually with new(),
but which is easy enough to get wrong that the help is valuable.
This pattern uses a variable x to develop x = logic(x + source) - source,
which finds a fixed point x that satisfies x + source = logic(x + source).
The fixed point equals the repeated application of logic to source plus the
To implement the pattern one would create a new initially empty variable with new(),
then concatenate source into that collection, and use it as logic dictates.
Just before the variable is set to the result collection, source is subtracted.
If using this pattern manually, it is important to bear in mind that the collection
that result from logic converges to its fixed point, but that once source is
subtracted the collection converges to this limit minus source, a collection that
may have records that accumulate to negative multiplicities, and for which the model
of them as “data sets” may break down. Be careful when applying non-linear operations
like reduce that they make sense when updates may have non-positive differences.
Finally, implementing this pattern manually has the ability to more directly implement
the logic x = logic(x + source) - source. If there is a different mechanism than just
adding the source, doing the logic, then subtracting the source, it is appropriate to do.
For example, if the logic modifies a few records it is possible to produce this update
directly without using the backstop implementation this method provides.
Sourcepub fn set(self, result: Collection<G, C>)
pub fn set(self, result: Collection<G, C>)
Set the definition of the Variable to a collection.
This method binds the Variable to be equal to the supplied collection,
which may be recursively defined in terms of the variable itself.
Auto Trait Implementations§
impl<G, C> Freeze for Variable<G, C>
impl<G, C> !RefUnwindSafe for Variable<G, C>
impl<G, C> !Send for Variable<G, C>
impl<G, C> !Sync for Variable<G, C>
impl<G, C> Unpin for Variable<G, C>where
<<G as ScopeParent>::Timestamp as Timestamp>::Summary: Unpin,
G: Unpin,
<G as ScopeParent>::Timestamp: Unpin,
impl<G, C> UnsafeUnpin for Variable<G, C>
impl<G, C> !UnwindSafe for Variable<G, C>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
Source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign, for types that do not implement AddAssign.