Struct differential_dataflow::operators::arrange::agent::TraceAgent [−][src]
A TraceReader
wrapper which can be imported into other dataflows.
The TraceAgent
is the default trace type produced by arranged
, and it can be extracted
from the dataflow in which it was defined, and imported into other dataflows.
Implementations
impl<Tr> TraceAgent<Tr> where
Tr: TraceReader,
Tr::Time: Timestamp + Lattice,
[src]
Tr: TraceReader,
Tr::Time: Timestamp + Lattice,
pub fn new(
trace: Tr,
operator: OperatorInfo,
logging: Option<Logger>
) -> (Self, TraceWriter<Tr>) where
Tr: Trace,
Tr::Batch: Batch<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
[src]
trace: Tr,
operator: OperatorInfo,
logging: Option<Logger>
) -> (Self, TraceWriter<Tr>) where
Tr: Trace,
Tr::Batch: Batch<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Creates a new agent from a trace reader.
pub fn new_listener(
&mut self,
activator: Activator
) -> Rc<(Activator, RefCell<VecDeque<TraceReplayInstruction<Tr>>>)>
[src]
&mut self,
activator: Activator
) -> Rc<(Activator, RefCell<VecDeque<TraceReplayInstruction<Tr>>>)>
Attaches a new shared queue to the trace.
The queue is first populated with existing batches from the trace,
The queue will be immediately populated with existing historical batches from the trace, and until the reference
is dropped the queue will receive new batches as produced by the source arrange
operator.
impl<Tr> TraceAgent<Tr> where
Tr: TraceReader + 'static,
Tr::Time: Lattice + Ord + Clone + 'static,
[src]
Tr: TraceReader + 'static,
Tr::Time: Lattice + Ord + Clone + 'static,
pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>> where
G: Scope<Timestamp = Tr::Time>,
Tr::Time: Timestamp,
[src]
G: Scope<Timestamp = Tr::Time>,
Tr::Time: Timestamp,
Copies an existing collection into the supplied scope.
This method creates an Arranged
collection that should appear indistinguishable from applying arrange
directly to the source collection brought into the local scope. The only caveat is that the initial state
of the collection is its current state, and updates occur from this point forward. The historical changes
the collection experienced in the past are accumulated, and the distinctions from the initial collection
are no longer evident.
The current behavior is that the introduced collection accumulates updates to some times less or equal
to self.get_logical_compaction()
. There is not currently a guarantee that the updates are accumulated to
the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
the historical collection may move through configurations that did not actually occur, even if eventually
arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
the intermediate computation could do something that the original computation did not, like diverge.
I would expect the semantics to improve to “updates are advanced to self.get_logical_compaction()
”, which
means the computation will run as if starting from exactly this frontier. It is not currently clear whose
responsibility this should be (the trace/batch should only reveal these times, or an operator should know
to advance times before using them).
Examples
extern crate timely; extern crate differential_dataflow; use timely::Config; use differential_dataflow::input::Input; use differential_dataflow::operators::arrange::ArrangeBySelf; use differential_dataflow::operators::reduce::Reduce; use differential_dataflow::trace::Trace; use differential_dataflow::trace::implementations::ord::OrdValSpine; fn main() { ::timely::execute(Config::thread(), |worker| { // create a first dataflow let mut trace = worker.dataflow::<u32,_,_>(|scope| { // create input handle and collection. scope.new_collection_from(0 .. 10).1 .arrange_by_self() .trace }); // do some work. worker.step(); worker.step(); // create a second dataflow worker.dataflow(move |scope| { trace.import(scope) .reduce(move |_key, src, dst| dst.push((*src[0].0, 1))); }); }).unwrap(); }
pub fn import_named<G>(
&mut self,
scope: &G,
name: &str
) -> Arranged<G, TraceAgent<Tr>> where
G: Scope<Timestamp = Tr::Time>,
Tr::Time: Timestamp,
[src]
&mut self,
scope: &G,
name: &str
) -> Arranged<G, TraceAgent<Tr>> where
G: Scope<Timestamp = Tr::Time>,
Tr::Time: Timestamp,
Same as import
, but allows to name the source.
pub fn import_core<G>(
&mut self,
scope: &G,
name: &str
) -> (Arranged<G, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>) where
G: Scope<Timestamp = Tr::Time>,
Tr::Time: Timestamp,
[src]
&mut self,
scope: &G,
name: &str
) -> (Arranged<G, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>) where
G: Scope<Timestamp = Tr::Time>,
Tr::Time: Timestamp,
Imports an arrangement into the supplied scope.
Examples
extern crate timely; extern crate differential_dataflow; use timely::Config; use timely::dataflow::ProbeHandle; use timely::dataflow::operators::Probe; use differential_dataflow::input::InputSession; use differential_dataflow::operators::arrange::ArrangeBySelf; use differential_dataflow::operators::reduce::Reduce; use differential_dataflow::trace::Trace; use differential_dataflow::trace::implementations::ord::OrdValSpine; fn main() { ::timely::execute(Config::thread(), |worker| { let mut input = InputSession::<_,(),isize>::new(); let mut probe = ProbeHandle::new(); // create a first dataflow let mut trace = worker.dataflow::<u32,_,_>(|scope| { // create input handle and collection. input.to_collection(scope) .arrange_by_self() .trace }); // do some work. worker.step(); worker.step(); // create a second dataflow let mut shutdown = worker.dataflow(|scope| { let (arrange, button) = trace.import_core(scope, "Import"); arrange.stream.probe_with(&mut probe); button }); worker.step(); worker.step(); assert!(!probe.done()); shutdown.press(); worker.step(); worker.step(); assert!(probe.done()); }).unwrap(); }
pub fn import_frontier<G>(
&mut self,
scope: &G,
name: &str
) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>) where
G: Scope<Timestamp = Tr::Time>,
Tr::Time: Timestamp + Lattice + Ord + Clone + 'static,
Tr: TraceReader,
[src]
&mut self,
scope: &G,
name: &str
) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>) where
G: Scope<Timestamp = Tr::Time>,
Tr::Time: Timestamp + Lattice + Ord + Clone + 'static,
Tr: TraceReader,
Imports an arrangement into the supplied scope.
This variant of import uses the get_logical_compaction
to forcibly advance timestamps in updates.
Examples
extern crate timely; extern crate differential_dataflow; use timely::Config; use timely::progress::frontier::AntichainRef; use timely::dataflow::ProbeHandle; use timely::dataflow::operators::Probe; use timely::dataflow::operators::Inspect; use differential_dataflow::input::InputSession; use differential_dataflow::operators::arrange::ArrangeBySelf; use differential_dataflow::operators::reduce::Reduce; use differential_dataflow::trace::Trace; use differential_dataflow::trace::TraceReader; use differential_dataflow::trace::implementations::ord::OrdValSpine; use differential_dataflow::input::Input; fn main() { ::timely::execute(Config::thread(), |worker| { let mut probe = ProbeHandle::new(); // create a first dataflow let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| { // create input handle and collection. let (handle, stream) = scope.new_collection(); let trace = stream.arrange_by_self().trace; (handle, trace) }); handle.insert(0); handle.advance_to(1); handle.flush(); worker.step(); handle.remove(0); handle.advance_to(2); handle.flush(); worker.step(); handle.insert(1); handle.advance_to(3); handle.flush(); worker.step(); handle.remove(1); handle.advance_to(4); handle.flush(); worker.step(); handle.insert(0); handle.advance_to(5); handle.flush(); worker.step(); trace.set_logical_compaction(AntichainRef::new(&[5])); // create a second dataflow let mut shutdown = worker.dataflow(|scope| { let (arrange, button) = trace.import_frontier(scope, "Import"); arrange .as_collection(|k,v| (*k,*v)) .inner .inspect(|(d,t,r)| { assert!(t >= &5); }) .probe_with(&mut probe); button }); worker.step(); worker.step(); assert!(!probe.done()); shutdown.press(); worker.step(); worker.step(); assert!(probe.done()); }).unwrap(); }
pub fn import_frontier_core<G>(
&mut self,
scope: &G,
name: &str,
frontier: Antichain<Tr::Time>
) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>) where
G: Scope<Timestamp = Tr::Time>,
Tr::Time: Timestamp + Lattice + Ord + Clone + 'static,
Tr: TraceReader,
[src]
&mut self,
scope: &G,
name: &str,
frontier: Antichain<Tr::Time>
) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>) where
G: Scope<Timestamp = Tr::Time>,
Tr::Time: Timestamp + Lattice + Ord + Clone + 'static,
Tr: TraceReader,
Import a trace advanced to a specific frontier.
Trait Implementations
impl<Tr> Clone for TraceAgent<Tr> where
Tr: TraceReader,
Tr::Time: Lattice + Ord + Clone + 'static,
[src]
Tr: TraceReader,
Tr::Time: Lattice + Ord + Clone + 'static,
fn clone(&self) -> Self
[src]
pub fn clone_from(&mut self, source: &Self)
1.0.0[src]
impl<Tr> Drop for TraceAgent<Tr> where
Tr: TraceReader,
Tr::Time: Lattice + Ord + Clone + 'static,
[src]
Tr: TraceReader,
Tr::Time: Lattice + Ord + Clone + 'static,
impl<Tr> TraceReader for TraceAgent<Tr> where
Tr: TraceReader,
Tr::Time: Lattice + Ord + Clone + 'static,
[src]
Tr: TraceReader,
Tr::Time: Lattice + Ord + Clone + 'static,
type Key = Tr::Key
Key by which updates are indexed.
type Val = Tr::Val
Values associated with keys.
type Time = Tr::Time
Timestamps associated with updates
type R = Tr::R
Associated update.
type Batch = Tr::Batch
The type of an immutable collection of updates.
type Cursor = Tr::Cursor
The type used to enumerate the collections contents.
fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>)
[src]
fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time>
[src]
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>)
[src]
fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time>
[src]
fn cursor_through(
&mut self,
frontier: AntichainRef<'_, Tr::Time>
) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)>
[src]
&mut self,
frontier: AntichainRef<'_, Tr::Time>
) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)>
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F)
[src]
fn cursor(
&mut self
) -> (Self::Cursor, <Self::Cursor as Cursor<Self::Key, Self::Val, Self::Time, Self::R>>::Storage)
[src]
&mut self
) -> (Self::Cursor, <Self::Cursor as Cursor<Self::Key, Self::Val, Self::Time, Self::R>>::Storage)
fn advance_by(&mut self, frontier: AntichainRef<'_, Self::Time>)
[src]
fn advance_frontier(&mut self) -> AntichainRef<'_, Self::Time>
[src]
fn distinguish_since(&mut self, frontier: AntichainRef<'_, Self::Time>)
[src]
fn distinguish_frontier(&mut self) -> AntichainRef<'_, Self::Time>
[src]
fn read_upper(&mut self, target: &mut Antichain<Self::Time>) where
Self::Time: Timestamp,
[src]
Self::Time: Timestamp,
fn advance_upper(&mut self, upper: &mut Antichain<Self::Time>) where
Self::Time: Timestamp,
[src]
Self::Time: Timestamp,
Auto Trait Implementations
impl<Tr> !RefUnwindSafe for TraceAgent<Tr>
impl<Tr> !Send for TraceAgent<Tr>
impl<Tr> !Sync for TraceAgent<Tr>
impl<Tr> Unpin for TraceAgent<Tr> where
<Tr as TraceReader>::Time: Unpin,
<Tr as TraceReader>::Time: Unpin,
impl<Tr> !UnwindSafe for TraceAgent<Tr>
Blanket Implementations
impl<T> Any for T where
T: 'static + ?Sized,
[src]
T: 'static + ?Sized,
impl<T> Borrow<T> for T where
T: ?Sized,
[src]
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
[src]
T: ?Sized,
pub fn borrow_mut(&mut self) -> &mut T
[src]
impl<T> Data for T where
T: 'static + Clone,
[src]
T: 'static + Clone,
impl<T> From<T> for T
[src]
impl<T, U> Into<U> for T where
U: From<T>,
[src]
U: From<T>,
impl<T> ToOwned for T where
T: Clone,
[src]
T: Clone,
type Owned = T
The resulting type after obtaining ownership.
pub fn to_owned(&self) -> T
[src]
pub fn clone_into(&self, target: &mut T)
[src]
impl<T, U> TryFrom<U> for T where
U: Into<T>,
[src]
U: Into<T>,
type Error = Infallible
The type returned in the event of a conversion error.
pub fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>
[src]
impl<T, U> TryInto<U> for T where
U: TryFrom<T>,
[src]
U: TryFrom<T>,