Struct differential_dataflow::input::InputSession [] [src]

pub struct InputSession<T: Timestamp + Clone, D: Data, R: Diff> { /* fields omitted */ }

An input session wrapping a single timely dataflow capability.

Each timely dataflow message has a corresponding capability, which is a logical time in the timely dataflow system. Differential dataflow updates can happen at a much higher rate than timely dataflow's progress tracking infrastructure supports, because the logical times are promoted to data and updates are batched together. The InputSession type does this batching.

Examples

extern crate timely;
extern crate timely_communication;
extern crate differential_dataflow;

use timely_communication::Configuration;
use differential_dataflow::input::Input;

fn main() {
    ::timely::execute(Configuration::Thread, |worker| {

        let (mut handle, probe) = worker.dataflow(|scope| {
            // create input handle and collection.
            let (handle, data) = scope.new_collection_from(0 .. 10);
            let probe = data.map(|x| x * 2)
                            .inspect(|x| println!("{:?}", x))
                            .probe();
            (handle, probe)
        });

        handle.insert(3);
        handle.advance_to(1);
        handle.insert(5);
        handle.advance_to(2);
        handle.flush();

        while probe.less_than(handle.time()) {
            worker.step();
        }

        handle.remove(5);
        handle.advance_to(3);
        handle.flush();

        while probe.less_than(handle.time()) {
            worker.step();
        }

    }).unwrap();
}

Methods

impl<T: Timestamp + Clone, D: Data> InputSession<T, D, isize>
[src]

[src]

Adds an element to the collection.

[src]

Removes an element from the collection.

impl<'a, T: Timestamp + Clone, D: Data, R: Diff> InputSession<T, D, R>
[src]

[src]

Creates a new session from a reference to an input handle.

[src]

Adds to the weight of an element in the collection.

[src]

Forces buffered data into the timely dataflow input, and advances its time to match that of the session.

It is important to call flush before expecting timely dataflow to report progress. Until this method is called, all updates may still be in internal buffers and not exposed to timely dataflow. Once the method is called, all buffers are flushed and timely dataflow is advised that some logical times are no longer possible.

[src]

Advances the logical time for future records.

Importantly, this method does not immediately inform timely dataflow of the change. This happens only when the session is dropped or flushed. It is not correct to use this time as a basis for a computation's step_while method unless the session has just been flushed.

[src]

Reveals the current time of the session.

[src]

Reveals the current time of the session.

[src]

Closes the input, flushing and sealing the wrapped timely input.

Trait Implementations

impl<T: Timestamp + Clone, D: Data, R: Diff> Drop for InputSession<T, D, R>
[src]

[src]

Executes the destructor for this type. Read more