differential_dataflow/trace/
mod.rs

1//! Traits and datastructures representing a collection trace.
2//!
3//! A collection trace is a set of updates of the form `(key, val, time, diff)`, which determine the contents
4//! of a collection at given times by accumulating updates whose time field is less or equal to the target field.
5//!
6//! The `Trace` trait describes those types and methods that a data structure must implement to be viewed as a
7//! collection trace. This trait allows operator implementations to be generic with respect to the type of trace,
8//! and allows various data structures to be interpretable as multiple different types of trace.
9
10pub mod cursor;
11pub mod description;
12pub mod implementations;
13pub mod wrappers;
14
15use timely::communication::message::RefOrMut;
16use timely::logging::WorkerIdentifier;
17use timely::logging_core::Logger;
18use timely::progress::{Antichain, frontier::AntichainRef};
19use timely::progress::Timestamp;
20
21use crate::logging::DifferentialEvent;
22use crate::trace::cursor::MyTrait;
23
24// use ::difference::Semigroup;
25pub use self::cursor::Cursor;
26pub use self::description::Description;
27
28/// A type used to express how much effort a trace should exert even in the absence of updates.
29pub type ExertionLogic = std::sync::Arc<dyn for<'a> Fn(Box<dyn Iterator<Item=(usize, usize, usize)>+'a>)->Option<usize>+Send+Sync>;
30
31//     The traces and batch and cursors want the flexibility to appear as if they manage certain types of keys and
32//     values and such, while perhaps using other representations, I'm thinking mostly of wrappers around the keys
33//     and vals that change the `Ord` implementation, or stash hash codes, or the like.
34//
35//     This complicates what requirements we make so that the trace is still usable by someone who knows only about
36//     the base key and value types. For example, the complex types should likely dereference to the simpler types,
37//    so that the user can make sense of the result as if they were given references to the simpler types. At the
38//  same time, the collection should be formable from base types (perhaps we need an `Into` or `From` constraint)
39//  and we should, somehow, be able to take a reference to the simple types to compare against the more complex
40//  types. This second one is also like an `Into` or `From` constraint, except that we start with a reference and
41//  really don't need anything more complex than a reference, but we can't form an owned copy of the complex type
42//  without cloning it.
43//
44//  We could just start by cloning things. Worry about wrapping references later on.
45
46/// A trace whose contents may be read.
47///
48/// This is a restricted interface to the more general `Trace` trait, which extends this trait with further methods
49/// to update the contents of the trace. These methods are used to examine the contents, and to update the reader's
50/// capabilities (which may release restrictions on the mutations to the underlying trace and cause work to happen).
51pub trait TraceReader {
52
53    /// Key by which updates are indexed.
54    type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>;
55    /// Owned version of the above.
56    type KeyOwned: Ord + Clone;
57    /// Values associated with keys.
58    type Val<'a>: Copy + Clone + MyTrait<'a, Owned = Self::ValOwned>;
59    /// Owned version of the above.
60    type ValOwned: Ord + Clone;
61    /// Timestamps associated with updates
62    type Time;
63    /// Associated update.
64    type Diff;
65
66    /// The type of an immutable collection of updates.
67    type Batch: for<'a> BatchReader<Key<'a> = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, ValOwned = Self::ValOwned, Time = Self::Time, Diff = Self::Diff>+Clone+'static;
68
69    /// Storage type for `Self::Cursor`. Likely related to `Self::Batch`.
70    type Storage;
71
72    /// The type used to enumerate the collections contents.
73    type Cursor: for<'a> Cursor<Storage=Self::Storage, Key<'a> = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, ValOwned = Self::ValOwned, Time = Self::Time, Diff = Self::Diff>;
74
75    /// Provides a cursor over updates contained in the trace.
76    fn cursor(&mut self) -> (Self::Cursor, Self::Storage) {
77        if let Some(cursor) = self.cursor_through(Antichain::new().borrow()) {
78            cursor
79        }
80        else {
81            panic!("unable to acquire complete cursor for trace; is it closed?");
82        }
83    }
84
85    /// Acquires a cursor to the restriction of the collection's contents to updates at times not greater or
86    /// equal to an element of `upper`.
87    ///
88    /// This method is expected to work if called with an `upper` that (i) was an observed bound in batches from
89    /// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should
90    /// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return a cursor. This
91    /// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses.
92    fn cursor_through(&mut self, upper: AntichainRef<Self::Time>) -> Option<(Self::Cursor, Self::Storage)>;
93
94    /// Advances the frontier that constrains logical compaction.
95    ///
96    /// Logical compaction is the ability of the trace to change the times of the updates it contains.
97    /// Update times may be changed as long as their comparison to all query times beyond the logical compaction
98    /// frontier remains unchanged. Practically, this means that groups of timestamps not beyond the frontier can
99    /// be coalesced into fewer representative times.
100    ///
101    /// Logical compaction is important, as it allows the trace to forget historical distinctions between update
102    /// times, and maintain a compact memory footprint over an unbounded update history.
103    ///
104    /// By advancing the logical compaction frontier, the caller unblocks merging of otherwise equivalent udates,
105    /// but loses the ability to observe historical detail that is not beyond `frontier`.
106    ///
107    /// It is an error to call this method with a frontier not equal to or beyond the most recent arguments to
108    /// this method, or the initial value of `get_logical_compaction()` if this method has not yet been called.
109    fn set_logical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
110
111    /// Deprecated form of `set_logical_compaction`.
112    #[deprecated(since = "0.11", note = "please use `set_logical_compaction`")]
113    fn advance_by(&mut self, frontier: AntichainRef<Self::Time>) {
114        self.set_logical_compaction(frontier);
115    }
116
117    /// Reports the logical compaction frontier.
118    ///
119    /// All update times beyond this frontier will be presented with their original times, and all update times
120    /// not beyond this frontier will present as a time that compares identically with all query times beyond
121    /// this frontier. Practically, update times not beyond this frontier should not be taken to be accurate as
122    /// presented, and should be used carefully, only in accumulation to times that are beyond the frontier.
123    fn get_logical_compaction(&mut self) -> AntichainRef<Self::Time>;
124
125    /// Deprecated form of `get_logical_compaction`.
126    #[deprecated(since = "0.11", note = "please use `get_logical_compaction`")]
127    fn advance_frontier(&mut self) -> AntichainRef<Self::Time> {
128        self.get_logical_compaction()
129    }
130
131    /// Advances the frontier that constrains physical compaction.
132    ///
133    /// Physical compaction is the ability of the trace to merge the batches of updates it maintains. Physical
134    /// compaction does not change the updates or their timestamps, although it is also the moment at which
135    /// logical compaction is most likely to happen.
136    ///
137    /// Physical compaction allows the trace to maintain a logarithmic number of batches of updates, which is
138    /// what allows the trace to provide efficient random access by keys and values.
139    ///
140    /// By advancing the physical compaction frontier, the caller unblocks the merging of batches of updates,
141    /// but loses the ability to create a cursor through any frontier not beyond `frontier`.
142    ///
143    /// It is an error to call this method with a frontier not equal to or beyond the most recent arguments to
144    /// this method, or the initial value of `get_physical_compaction()` if this method has not yet been called.
145    fn set_physical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
146
147    /// Deprecated form of `set_physical_compaction`.
148    #[deprecated(since = "0.11", note = "please use `set_physical_compaction`")]
149    fn distinguish_since(&mut self, frontier: AntichainRef<Self::Time>) {
150        self.set_physical_compaction(frontier);
151    }
152
153    /// Reports the physical compaction frontier.
154    ///
155    /// All batches containing updates beyond this frontier will not be merged with ohter batches. This allows
156    /// the caller to create a cursor through any frontier beyond the physical compaction frontier, with the
157    /// `cursor_through()` method. This functionality is primarily of interest to the `join` operator, and any
158    /// other operators who need to take notice of the physical structure of update batches.
159    fn get_physical_compaction(&mut self) -> AntichainRef<Self::Time>;
160
161    /// Deprecated form of `get_physical_compaction`.
162    #[deprecated(since = "0.11", note = "please use `get_physical_compaction`")]
163    fn distinguish_frontier(&mut self) -> AntichainRef<Self::Time> {
164        self.get_physical_compaction()
165    }
166
167    /// Maps logic across the non-empty sequence of batches in the trace.
168    ///
169    /// This is currently used only to extract historical data to prime late-starting operators who want to reproduce
170    /// the stream of batches moving past the trace. It could also be a fine basis for a default implementation of the
171    /// cursor methods, as they (by default) just move through batches accumulating cursors into a cursor list.
172    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F);
173
174    /// Reads the upper frontier of committed times.
175    ///
176    ///
177    #[inline]
178    fn read_upper(&mut self, target: &mut Antichain<Self::Time>)
179    where
180        Self::Time: Timestamp,
181    {
182        target.clear();
183        target.insert(<Self::Time as timely::progress::Timestamp>::minimum());
184        self.map_batches(|batch| {
185            target.clone_from(batch.upper());
186        });
187    }
188
189    /// Advances `upper` by any empty batches.
190    ///
191    /// An empty batch whose `batch.lower` bound equals the current
192    /// contents of `upper` will advance `upper` to `batch.upper`.
193    /// Taken across all batches, this should advance `upper` across
194    /// empty batch regions.
195    fn advance_upper(&mut self, upper: &mut Antichain<Self::Time>)
196    where
197        Self::Time: Timestamp,
198    {
199        self.map_batches(|batch| {
200            if batch.is_empty() && batch.lower() == upper {
201                upper.clone_from(batch.upper());
202            }
203        });
204    }
205
206}
207
208/// An append-only collection of `(key, val, time, diff)` tuples.
209///
210/// The trace must pretend to look like a collection of `(Key, Val, Time, isize)` tuples, but is permitted
211/// to introduce new types `KeyRef`, `ValRef`, and `TimeRef` which can be dereference to the types above.
212///
213/// The trace must be constructable from, and navigable by the `Key`, `Val`, `Time` types, but does not need
214/// to return them.
215pub trait Trace : TraceReader
216where <Self as TraceReader>::Batch: Batch {
217
218    /// A type used to assemble batches from disordered updates.
219    type Batcher: Batcher<Time = Self::Time>;
220    /// A type used to assemble batches from ordered update sequences.
221    type Builder: Builder<Item=<Self::Batcher as Batcher>::Item, Time=Self::Time, Output = Self::Batch>;
222
223    /// Allocates a new empty trace.
224    fn new(
225        info: ::timely::dataflow::operators::generic::OperatorInfo,
226        logging: Option<crate::logging::Logger>,
227        activator: Option<timely::scheduling::activate::Activator>,
228    ) -> Self;
229
230    /// Exert merge effort, even without updates.
231    fn exert(&mut self);
232
233    /// Sets the logic for exertion in the absence of updates.
234    ///
235    /// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`,
236    /// indicating the level, the number of batches, and their total length in updates. It should return a number of 
237    /// updates to perform, or `None` if no work is required.
238    fn set_exert_logic(&mut self, logic: ExertionLogic);
239
240    /// Introduces a batch of updates to the trace.
241    ///
242    /// Batches describe the time intervals they contain, and they should be added to the trace in contiguous
243    /// intervals. If a batch arrives with a lower bound that does not equal the upper bound of the most recent
244    /// addition, the trace will add an empty batch. It is an error to then try to populate that region of time.
245    ///
246    /// This restriction could be relaxed, especially if we discover ways in which batch interval order could
247    /// commute. For now, the trace should complain, to the extent that it cares about contiguous intervals.
248    fn insert(&mut self, batch: Self::Batch);
249
250    /// Introduces an empty batch concluding the trace.
251    ///
252    /// This method should be logically equivalent to introducing an empty batch whose lower frontier equals
253    /// the upper frontier of the most recently introduced batch, and whose upper frontier is empty.
254    fn close(&mut self);
255}
256
257/// A batch of updates whose contents may be read.
258///
259/// This is a restricted interface to batches of updates, which support the reading of the batch's contents,
260/// but do not expose ways to construct the batches. This trait is appropriate for views of the batch, and is
261/// especially useful for views derived from other sources in ways that prevent the construction of batches
262/// from the type of data in the view (for example, filtered views, or views with extended time coordinates).
263pub trait BatchReader
264where
265    Self: ::std::marker::Sized,
266{
267    /// Key by which updates are indexed.
268    type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>;
269    /// Owned version of the above.
270    type KeyOwned: Ord + Clone;
271    /// Values associated with keys.
272    type Val<'a>: Copy + Clone + MyTrait<'a, Owned = Self::ValOwned>;
273    /// Owned version of the above.
274    type ValOwned: Ord + Clone;
275    /// Timestamps associated with updates
276    type Time: Timestamp;
277    /// Associated update.
278    type Diff;
279
280    /// The type used to enumerate the batch's contents.
281    type Cursor: for<'a> Cursor<Storage=Self, Key<'a> = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, ValOwned = Self::ValOwned, Time = Self::Time, Diff = Self::Diff>;
282    /// Acquires a cursor to the batch's contents.
283    fn cursor(&self) -> Self::Cursor;
284    /// The number of updates in the batch.
285    fn len(&self) -> usize;
286    /// True if the batch is empty.
287    fn is_empty(&self) -> bool { self.len() == 0 }
288    /// Describes the times of the updates in the batch.
289    fn description(&self) -> &Description<Self::Time>;
290
291    /// All times in the batch are greater or equal to an element of `lower`.
292    fn lower(&self) -> &Antichain<Self::Time> { self.description().lower() }
293    /// All times in the batch are not greater or equal to any element of `upper`.
294    fn upper(&self) -> &Antichain<Self::Time> { self.description().upper() }
295}
296
297/// An immutable collection of updates.
298pub trait Batch : BatchReader where Self: ::std::marker::Sized {
299    /// A type used to progressively merge batches.
300    type Merger: Merger<Self>;
301
302    /// Initiates the merging of consecutive batches.
303    ///
304    /// The result of this method can be exercised to eventually produce the same result
305    /// that a call to `self.merge(other)` would produce, but it can be done in a measured
306    /// fashion. This can help to avoid latency spikes where a large merge needs to happen.
307    fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<Self::Time>) -> Self::Merger {
308        Self::Merger::new(self, other, compaction_frontier)
309    }
310}
311
312/// Functionality for collecting and batching updates.
313pub trait Batcher {
314    /// Type of update pushed into the batcher.
315    type Item;
316    /// Times at which batches are formed.
317    type Time: Timestamp;
318    /// Allocates a new empty batcher.
319    fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self;
320    /// Adds an unordered batch of elements to the batcher.
321    fn push_batch(&mut self, batch: RefOrMut<Vec<Self::Item>>);
322    /// Returns all updates not greater or equal to an element of `upper`.
323    fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
324    /// Returns the lower envelope of contained update times.
325    fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<Self::Time>;
326}
327
328/// Functionality for building batches from ordered update sequences.
329pub trait Builder: Sized {
330    /// Input item type.
331    type Item;
332    /// Timestamp type.
333    type Time: Timestamp;
334    /// Output batch type.
335    type Output;
336    
337    /// Allocates an empty builder.
338    ///
339    /// Ideally we deprecate this and insist all non-trivial building happens via `with_capacity()`.
340    // #[deprecated]
341    fn new() -> Self { Self::with_capacity(0, 0, 0) }
342    /// Allocates an empty builder with capacity for the specified keys, values, and updates.
343    ///
344    /// They represent respectively the number of distinct `key`, `(key, val)`, and total updates.
345    fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self;
346    /// Adds an element to the batch.
347    ///
348    /// The default implementation uses `self.copy` with references to the owned arguments.
349    /// One should override it if the builder can take advantage of owned arguments.
350    fn push(&mut self, element: Self::Item) {
351        self.copy(&element);
352    }
353    /// Adds an element to the batch.
354    fn copy(&mut self, element: &Self::Item);
355    /// Adds an ordered sequence of elements to the batch.
356    fn extend<I: Iterator<Item=Self::Item>>(&mut self, iter: I) {
357        for item in iter { self.push(item); }
358    }
359    /// Completes building and returns the batch.
360    fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Self::Output;
361}
362
363/// Represents a merge in progress.
364pub trait Merger<Output: Batch> {
365    /// Creates a new merger to merge the supplied batches, optionally compacting
366    /// up to the supplied frontier.
367    fn new(source1: &Output, source2: &Output, compaction_frontier: AntichainRef<Output::Time>) -> Self;
368    /// Perform some amount of work, decrementing `fuel`.
369    ///
370    /// If `fuel` is non-zero after the call, the merging is complete and
371    /// one should call `done` to extract the merged results.
372    fn work(&mut self, source1: &Output, source2: &Output, fuel: &mut isize);
373    /// Extracts merged results.
374    ///
375    /// This method should only be called after `work` has been called and
376    /// has not brought `fuel` to zero. Otherwise, the merge is still in
377    /// progress.
378    fn done(self) -> Output;
379}
380
381
382/// Blanket implementations for reference counted batches.
383pub mod rc_blanket_impls {
384
385    use std::rc::Rc;
386
387    use timely::progress::{Antichain, frontier::AntichainRef};
388    use super::{Batch, BatchReader, Builder, Merger, Cursor, Description};
389
390    impl<B: BatchReader> BatchReader for Rc<B> {
391        type Key<'a> = B::Key<'a>;
392        type KeyOwned = B::KeyOwned;
393        type Val<'a> = B::Val<'a>;
394        type ValOwned = B::ValOwned;
395        type Time = B::Time;
396        type Diff = B::Diff;
397
398        /// The type used to enumerate the batch's contents.
399        type Cursor = RcBatchCursor<B::Cursor>;
400        /// Acquires a cursor to the batch's contents.
401        fn cursor(&self) -> Self::Cursor {
402            RcBatchCursor::new((**self).cursor())
403        }
404
405        /// The number of updates in the batch.
406        fn len(&self) -> usize { (**self).len() }
407        /// Describes the times of the updates in the batch.
408        fn description(&self) -> &Description<Self::Time> { (**self).description() }
409    }
410
411    /// Wrapper to provide cursor to nested scope.
412    pub struct RcBatchCursor<C> {
413        cursor: C,
414    }
415
416    impl<C> RcBatchCursor<C> {
417        fn new(cursor: C) -> Self {
418            RcBatchCursor {
419                cursor,
420            }
421        }
422    }
423
424    impl<C: Cursor> Cursor for RcBatchCursor<C> {
425
426        type Key<'a> = C::Key<'a>;
427        type KeyOwned = C::KeyOwned;
428        type Val<'a> = C::Val<'a>;
429        type ValOwned = C::ValOwned;
430        type Time = C::Time;
431        type Diff = C::Diff;
432
433        type Storage = Rc<C::Storage>;
434
435        #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
436        #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
437
438        #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
439        #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
440
441        #[inline]
442        fn map_times<L: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Self::Storage, logic: L) {
443            self.cursor.map_times(storage, logic)
444        }
445
446        #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
447        #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
448
449        #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
450        #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
451
452        #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
453        #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
454    }
455
456    /// An immutable collection of updates.
457    impl<B: Batch> Batch for Rc<B> {
458        type Merger = RcMerger<B>;
459    }
460
461    /// Wrapper type for building reference counted batches.
462    pub struct RcBuilder<B: Builder> { builder: B }
463
464    /// Functionality for building batches from ordered update sequences.
465    impl<B: Builder> Builder for RcBuilder<B> {
466        type Item = B::Item;
467        type Time = B::Time;
468        type Output = Rc<B::Output>;
469        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } }
470        fn push(&mut self, element: Self::Item) { self.builder.push(element) }
471        fn copy(&mut self, element: &Self::Item) { self.builder.copy(element) }
472        fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Rc<B::Output> { Rc::new(self.builder.done(lower, upper, since)) }
473    }
474
475    /// Wrapper type for merging reference counted batches.
476    pub struct RcMerger<B:Batch> { merger: B::Merger }
477
478    /// Represents a merge in progress.
479    impl<B:Batch> Merger<Rc<B>> for RcMerger<B> {
480        fn new(source1: &Rc<B>, source2: &Rc<B>, compaction_frontier: AntichainRef<B::Time>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } }
481        fn work(&mut self, source1: &Rc<B>, source2: &Rc<B>, fuel: &mut isize) { self.merger.work(source1, source2, fuel) }
482        fn done(self) -> Rc<B> { Rc::new(self.merger.done()) }
483    }
484}
485
486
487/// Blanket implementations for reference counted batches.
488pub mod abomonated_blanket_impls {
489    use abomonation::{Abomonation, measure};
490    use abomonation::abomonated::Abomonated;
491    use timely::progress::{Antichain, frontier::AntichainRef};
492
493    use super::{Batch, BatchReader, Builder, Merger, Cursor, Description};
494
495    impl<B: BatchReader+Abomonation> BatchReader for Abomonated<B, Vec<u8>> {
496
497        type Key<'a> = B::Key<'a>;
498        type KeyOwned = B::KeyOwned;
499        type Val<'a> = B::Val<'a>;
500        type ValOwned = B::ValOwned;
501        type Time = B::Time;
502        type Diff = B::Diff;
503
504        /// The type used to enumerate the batch's contents.
505        type Cursor = AbomonatedBatchCursor<B::Cursor>;
506        /// Acquires a cursor to the batch's contents.
507        fn cursor(&self) -> Self::Cursor {
508            AbomonatedBatchCursor::new((**self).cursor())
509        }
510
511        /// The number of updates in the batch.
512        fn len(&self) -> usize { (**self).len() }
513        /// Describes the times of the updates in the batch.
514        fn description(&self) -> &Description<Self::Time> { (**self).description() }
515    }
516
517    /// Wrapper to provide cursor to nested scope.
518    pub struct AbomonatedBatchCursor<C> {
519        cursor: C,
520    }
521
522    impl<C> AbomonatedBatchCursor<C> {
523        fn new(cursor: C) -> Self {
524            AbomonatedBatchCursor {
525                cursor,
526            }
527        }
528    }
529
530    impl<C: Cursor> Cursor for AbomonatedBatchCursor<C> where C::Storage: Abomonation {
531
532        type Key<'a> = C::Key<'a>;
533        type KeyOwned = C::KeyOwned;
534        type Val<'a> = C::Val<'a>;
535        type ValOwned = C::ValOwned;
536        type Time = C::Time;
537        type Diff = C::Diff;
538
539        type Storage = Abomonated<C::Storage, Vec<u8>>;
540
541        #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
542        #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
543
544        #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
545        #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
546
547        #[inline]
548        fn map_times<L: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Self::Storage, logic: L) {
549            self.cursor.map_times(storage, logic)
550        }
551
552        #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
553        #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
554
555        #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
556        #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
557
558        #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
559        #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
560    }
561
562    /// An immutable collection of updates.
563    impl<B: Batch+Abomonation> Batch for Abomonated<B, Vec<u8>> {
564        type Merger = AbomonatedMerger<B>;
565    }
566
567    /// Wrapper type for building reference counted batches.
568    pub struct AbomonatedBuilder<B: Builder> { builder: B }
569
570    /// Functionality for building batches from ordered update sequences.
571    impl<B: Builder> Builder for AbomonatedBuilder<B>
572    where
573        B::Output: Abomonation,
574    {
575        type Item = B::Item;
576        type Time = B::Time;
577        type Output = Abomonated<B::Output, Vec<u8>>;
578        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(keys, vals, upds) } }
579        fn push(&mut self, element: Self::Item) { self.builder.push(element) }
580        fn copy(&mut self, element: &Self::Item) { self.builder.copy(element) }
581        fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Self::Output {
582            let batch = self.builder.done(lower, upper, since);
583            let mut bytes = Vec::with_capacity(measure(&batch));
584            unsafe { abomonation::encode(&batch, &mut bytes).unwrap() };
585            unsafe { Abomonated::<B::Output,_>::new(bytes).unwrap() }
586        }
587    }
588
589    /// Wrapper type for merging reference counted batches.
590    pub struct AbomonatedMerger<B:Batch> { merger: B::Merger }
591
592    /// Represents a merge in progress.
593    impl<B:Batch+Abomonation> Merger<Abomonated<B,Vec<u8>>> for AbomonatedMerger<B> {
594        fn new(source1: &Abomonated<B,Vec<u8>>, source2: &Abomonated<B,Vec<u8>>, compaction_frontier: AntichainRef<B::Time>) -> Self {
595            AbomonatedMerger { merger: B::begin_merge(source1, source2, compaction_frontier) }
596        }
597        fn work(&mut self, source1: &Abomonated<B,Vec<u8>>, source2: &Abomonated<B,Vec<u8>>, fuel: &mut isize) {
598            self.merger.work(source1, source2, fuel)
599        }
600        fn done(self) -> Abomonated<B, Vec<u8>> {
601            let batch = self.merger.done();
602            let mut bytes = Vec::with_capacity(measure(&batch));
603            unsafe { abomonation::encode(&batch, &mut bytes).unwrap() };
604            unsafe { Abomonated::<B,_>::new(bytes).unwrap() }
605        }
606    }
607}