differential_dataflow/trace/mod.rs
1//! Traits and datastructures representing a collection trace.
2//!
3//! A collection trace is a set of updates of the form `(key, val, time, diff)`, which determine the contents
4//! of a collection at given times by accumulating updates whose time field is less or equal to the target field.
5//!
6//! The `Trace` trait describes those types and methods that a data structure must implement to be viewed as a
7//! collection trace. This trait allows operator implementations to be generic with respect to the type of trace,
8//! and allows various data structures to be interpretable as multiple different types of trace.
9
10pub mod cursor;
11pub mod description;
12pub mod implementations;
13pub mod wrappers;
14
15use timely::communication::message::RefOrMut;
16use timely::logging::WorkerIdentifier;
17use timely::logging_core::Logger;
18use timely::progress::{Antichain, frontier::AntichainRef};
19use timely::progress::Timestamp;
20
21use crate::logging::DifferentialEvent;
22use crate::trace::cursor::MyTrait;
23
24// use ::difference::Semigroup;
25pub use self::cursor::Cursor;
26pub use self::description::Description;
27
28/// A type used to express how much effort a trace should exert even in the absence of updates.
29pub type ExertionLogic = std::sync::Arc<dyn for<'a> Fn(Box<dyn Iterator<Item=(usize, usize, usize)>+'a>)->Option<usize>+Send+Sync>;
30
31// The traces and batch and cursors want the flexibility to appear as if they manage certain types of keys and
32// values and such, while perhaps using other representations, I'm thinking mostly of wrappers around the keys
33// and vals that change the `Ord` implementation, or stash hash codes, or the like.
34//
35// This complicates what requirements we make so that the trace is still usable by someone who knows only about
36// the base key and value types. For example, the complex types should likely dereference to the simpler types,
37// so that the user can make sense of the result as if they were given references to the simpler types. At the
38// same time, the collection should be formable from base types (perhaps we need an `Into` or `From` constraint)
39// and we should, somehow, be able to take a reference to the simple types to compare against the more complex
40// types. This second one is also like an `Into` or `From` constraint, except that we start with a reference and
41// really don't need anything more complex than a reference, but we can't form an owned copy of the complex type
42// without cloning it.
43//
44// We could just start by cloning things. Worry about wrapping references later on.
45
46/// A trace whose contents may be read.
47///
48/// This is a restricted interface to the more general `Trace` trait, which extends this trait with further methods
49/// to update the contents of the trace. These methods are used to examine the contents, and to update the reader's
50/// capabilities (which may release restrictions on the mutations to the underlying trace and cause work to happen).
51pub trait TraceReader {
52
53 /// Key by which updates are indexed.
54 type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>;
55 /// Owned version of the above.
56 type KeyOwned: Ord + Clone;
57 /// Values associated with keys.
58 type Val<'a>: Copy + Clone + MyTrait<'a, Owned = Self::ValOwned>;
59 /// Owned version of the above.
60 type ValOwned: Ord + Clone;
61 /// Timestamps associated with updates
62 type Time;
63 /// Associated update.
64 type Diff;
65
66 /// The type of an immutable collection of updates.
67 type Batch: for<'a> BatchReader<Key<'a> = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, ValOwned = Self::ValOwned, Time = Self::Time, Diff = Self::Diff>+Clone+'static;
68
69 /// Storage type for `Self::Cursor`. Likely related to `Self::Batch`.
70 type Storage;
71
72 /// The type used to enumerate the collections contents.
73 type Cursor: for<'a> Cursor<Storage=Self::Storage, Key<'a> = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, ValOwned = Self::ValOwned, Time = Self::Time, Diff = Self::Diff>;
74
75 /// Provides a cursor over updates contained in the trace.
76 fn cursor(&mut self) -> (Self::Cursor, Self::Storage) {
77 if let Some(cursor) = self.cursor_through(Antichain::new().borrow()) {
78 cursor
79 }
80 else {
81 panic!("unable to acquire complete cursor for trace; is it closed?");
82 }
83 }
84
85 /// Acquires a cursor to the restriction of the collection's contents to updates at times not greater or
86 /// equal to an element of `upper`.
87 ///
88 /// This method is expected to work if called with an `upper` that (i) was an observed bound in batches from
89 /// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should
90 /// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return a cursor. This
91 /// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses.
92 fn cursor_through(&mut self, upper: AntichainRef<Self::Time>) -> Option<(Self::Cursor, Self::Storage)>;
93
94 /// Advances the frontier that constrains logical compaction.
95 ///
96 /// Logical compaction is the ability of the trace to change the times of the updates it contains.
97 /// Update times may be changed as long as their comparison to all query times beyond the logical compaction
98 /// frontier remains unchanged. Practically, this means that groups of timestamps not beyond the frontier can
99 /// be coalesced into fewer representative times.
100 ///
101 /// Logical compaction is important, as it allows the trace to forget historical distinctions between update
102 /// times, and maintain a compact memory footprint over an unbounded update history.
103 ///
104 /// By advancing the logical compaction frontier, the caller unblocks merging of otherwise equivalent udates,
105 /// but loses the ability to observe historical detail that is not beyond `frontier`.
106 ///
107 /// It is an error to call this method with a frontier not equal to or beyond the most recent arguments to
108 /// this method, or the initial value of `get_logical_compaction()` if this method has not yet been called.
109 fn set_logical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
110
111 /// Deprecated form of `set_logical_compaction`.
112 #[deprecated(since = "0.11", note = "please use `set_logical_compaction`")]
113 fn advance_by(&mut self, frontier: AntichainRef<Self::Time>) {
114 self.set_logical_compaction(frontier);
115 }
116
117 /// Reports the logical compaction frontier.
118 ///
119 /// All update times beyond this frontier will be presented with their original times, and all update times
120 /// not beyond this frontier will present as a time that compares identically with all query times beyond
121 /// this frontier. Practically, update times not beyond this frontier should not be taken to be accurate as
122 /// presented, and should be used carefully, only in accumulation to times that are beyond the frontier.
123 fn get_logical_compaction(&mut self) -> AntichainRef<Self::Time>;
124
125 /// Deprecated form of `get_logical_compaction`.
126 #[deprecated(since = "0.11", note = "please use `get_logical_compaction`")]
127 fn advance_frontier(&mut self) -> AntichainRef<Self::Time> {
128 self.get_logical_compaction()
129 }
130
131 /// Advances the frontier that constrains physical compaction.
132 ///
133 /// Physical compaction is the ability of the trace to merge the batches of updates it maintains. Physical
134 /// compaction does not change the updates or their timestamps, although it is also the moment at which
135 /// logical compaction is most likely to happen.
136 ///
137 /// Physical compaction allows the trace to maintain a logarithmic number of batches of updates, which is
138 /// what allows the trace to provide efficient random access by keys and values.
139 ///
140 /// By advancing the physical compaction frontier, the caller unblocks the merging of batches of updates,
141 /// but loses the ability to create a cursor through any frontier not beyond `frontier`.
142 ///
143 /// It is an error to call this method with a frontier not equal to or beyond the most recent arguments to
144 /// this method, or the initial value of `get_physical_compaction()` if this method has not yet been called.
145 fn set_physical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
146
147 /// Deprecated form of `set_physical_compaction`.
148 #[deprecated(since = "0.11", note = "please use `set_physical_compaction`")]
149 fn distinguish_since(&mut self, frontier: AntichainRef<Self::Time>) {
150 self.set_physical_compaction(frontier);
151 }
152
153 /// Reports the physical compaction frontier.
154 ///
155 /// All batches containing updates beyond this frontier will not be merged with ohter batches. This allows
156 /// the caller to create a cursor through any frontier beyond the physical compaction frontier, with the
157 /// `cursor_through()` method. This functionality is primarily of interest to the `join` operator, and any
158 /// other operators who need to take notice of the physical structure of update batches.
159 fn get_physical_compaction(&mut self) -> AntichainRef<Self::Time>;
160
161 /// Deprecated form of `get_physical_compaction`.
162 #[deprecated(since = "0.11", note = "please use `get_physical_compaction`")]
163 fn distinguish_frontier(&mut self) -> AntichainRef<Self::Time> {
164 self.get_physical_compaction()
165 }
166
167 /// Maps logic across the non-empty sequence of batches in the trace.
168 ///
169 /// This is currently used only to extract historical data to prime late-starting operators who want to reproduce
170 /// the stream of batches moving past the trace. It could also be a fine basis for a default implementation of the
171 /// cursor methods, as they (by default) just move through batches accumulating cursors into a cursor list.
172 fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F);
173
174 /// Reads the upper frontier of committed times.
175 ///
176 ///
177 #[inline]
178 fn read_upper(&mut self, target: &mut Antichain<Self::Time>)
179 where
180 Self::Time: Timestamp,
181 {
182 target.clear();
183 target.insert(<Self::Time as timely::progress::Timestamp>::minimum());
184 self.map_batches(|batch| {
185 target.clone_from(batch.upper());
186 });
187 }
188
189 /// Advances `upper` by any empty batches.
190 ///
191 /// An empty batch whose `batch.lower` bound equals the current
192 /// contents of `upper` will advance `upper` to `batch.upper`.
193 /// Taken across all batches, this should advance `upper` across
194 /// empty batch regions.
195 fn advance_upper(&mut self, upper: &mut Antichain<Self::Time>)
196 where
197 Self::Time: Timestamp,
198 {
199 self.map_batches(|batch| {
200 if batch.is_empty() && batch.lower() == upper {
201 upper.clone_from(batch.upper());
202 }
203 });
204 }
205
206}
207
208/// An append-only collection of `(key, val, time, diff)` tuples.
209///
210/// The trace must pretend to look like a collection of `(Key, Val, Time, isize)` tuples, but is permitted
211/// to introduce new types `KeyRef`, `ValRef`, and `TimeRef` which can be dereference to the types above.
212///
213/// The trace must be constructable from, and navigable by the `Key`, `Val`, `Time` types, but does not need
214/// to return them.
215pub trait Trace : TraceReader
216where <Self as TraceReader>::Batch: Batch {
217
218 /// A type used to assemble batches from disordered updates.
219 type Batcher: Batcher<Time = Self::Time>;
220 /// A type used to assemble batches from ordered update sequences.
221 type Builder: Builder<Item=<Self::Batcher as Batcher>::Item, Time=Self::Time, Output = Self::Batch>;
222
223 /// Allocates a new empty trace.
224 fn new(
225 info: ::timely::dataflow::operators::generic::OperatorInfo,
226 logging: Option<crate::logging::Logger>,
227 activator: Option<timely::scheduling::activate::Activator>,
228 ) -> Self;
229
230 /// Exert merge effort, even without updates.
231 fn exert(&mut self);
232
233 /// Sets the logic for exertion in the absence of updates.
234 ///
235 /// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`,
236 /// indicating the level, the number of batches, and their total length in updates. It should return a number of
237 /// updates to perform, or `None` if no work is required.
238 fn set_exert_logic(&mut self, logic: ExertionLogic);
239
240 /// Introduces a batch of updates to the trace.
241 ///
242 /// Batches describe the time intervals they contain, and they should be added to the trace in contiguous
243 /// intervals. If a batch arrives with a lower bound that does not equal the upper bound of the most recent
244 /// addition, the trace will add an empty batch. It is an error to then try to populate that region of time.
245 ///
246 /// This restriction could be relaxed, especially if we discover ways in which batch interval order could
247 /// commute. For now, the trace should complain, to the extent that it cares about contiguous intervals.
248 fn insert(&mut self, batch: Self::Batch);
249
250 /// Introduces an empty batch concluding the trace.
251 ///
252 /// This method should be logically equivalent to introducing an empty batch whose lower frontier equals
253 /// the upper frontier of the most recently introduced batch, and whose upper frontier is empty.
254 fn close(&mut self);
255}
256
257/// A batch of updates whose contents may be read.
258///
259/// This is a restricted interface to batches of updates, which support the reading of the batch's contents,
260/// but do not expose ways to construct the batches. This trait is appropriate for views of the batch, and is
261/// especially useful for views derived from other sources in ways that prevent the construction of batches
262/// from the type of data in the view (for example, filtered views, or views with extended time coordinates).
263pub trait BatchReader
264where
265 Self: ::std::marker::Sized,
266{
267 /// Key by which updates are indexed.
268 type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>;
269 /// Owned version of the above.
270 type KeyOwned: Ord + Clone;
271 /// Values associated with keys.
272 type Val<'a>: Copy + Clone + MyTrait<'a, Owned = Self::ValOwned>;
273 /// Owned version of the above.
274 type ValOwned: Ord + Clone;
275 /// Timestamps associated with updates
276 type Time: Timestamp;
277 /// Associated update.
278 type Diff;
279
280 /// The type used to enumerate the batch's contents.
281 type Cursor: for<'a> Cursor<Storage=Self, Key<'a> = Self::Key<'a>, KeyOwned = Self::KeyOwned, Val<'a> = Self::Val<'a>, ValOwned = Self::ValOwned, Time = Self::Time, Diff = Self::Diff>;
282 /// Acquires a cursor to the batch's contents.
283 fn cursor(&self) -> Self::Cursor;
284 /// The number of updates in the batch.
285 fn len(&self) -> usize;
286 /// True if the batch is empty.
287 fn is_empty(&self) -> bool { self.len() == 0 }
288 /// Describes the times of the updates in the batch.
289 fn description(&self) -> &Description<Self::Time>;
290
291 /// All times in the batch are greater or equal to an element of `lower`.
292 fn lower(&self) -> &Antichain<Self::Time> { self.description().lower() }
293 /// All times in the batch are not greater or equal to any element of `upper`.
294 fn upper(&self) -> &Antichain<Self::Time> { self.description().upper() }
295}
296
297/// An immutable collection of updates.
298pub trait Batch : BatchReader where Self: ::std::marker::Sized {
299 /// A type used to progressively merge batches.
300 type Merger: Merger<Self>;
301
302 /// Initiates the merging of consecutive batches.
303 ///
304 /// The result of this method can be exercised to eventually produce the same result
305 /// that a call to `self.merge(other)` would produce, but it can be done in a measured
306 /// fashion. This can help to avoid latency spikes where a large merge needs to happen.
307 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<Self::Time>) -> Self::Merger {
308 Self::Merger::new(self, other, compaction_frontier)
309 }
310}
311
312/// Functionality for collecting and batching updates.
313pub trait Batcher {
314 /// Type of update pushed into the batcher.
315 type Item;
316 /// Times at which batches are formed.
317 type Time: Timestamp;
318 /// Allocates a new empty batcher.
319 fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self;
320 /// Adds an unordered batch of elements to the batcher.
321 fn push_batch(&mut self, batch: RefOrMut<Vec<Self::Item>>);
322 /// Returns all updates not greater or equal to an element of `upper`.
323 fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
324 /// Returns the lower envelope of contained update times.
325 fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<Self::Time>;
326}
327
328/// Functionality for building batches from ordered update sequences.
329pub trait Builder: Sized {
330 /// Input item type.
331 type Item;
332 /// Timestamp type.
333 type Time: Timestamp;
334 /// Output batch type.
335 type Output;
336
337 /// Allocates an empty builder.
338 ///
339 /// Ideally we deprecate this and insist all non-trivial building happens via `with_capacity()`.
340 // #[deprecated]
341 fn new() -> Self { Self::with_capacity(0, 0, 0) }
342 /// Allocates an empty builder with capacity for the specified keys, values, and updates.
343 ///
344 /// They represent respectively the number of distinct `key`, `(key, val)`, and total updates.
345 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self;
346 /// Adds an element to the batch.
347 ///
348 /// The default implementation uses `self.copy` with references to the owned arguments.
349 /// One should override it if the builder can take advantage of owned arguments.
350 fn push(&mut self, element: Self::Item) {
351 self.copy(&element);
352 }
353 /// Adds an element to the batch.
354 fn copy(&mut self, element: &Self::Item);
355 /// Adds an ordered sequence of elements to the batch.
356 fn extend<I: Iterator<Item=Self::Item>>(&mut self, iter: I) {
357 for item in iter { self.push(item); }
358 }
359 /// Completes building and returns the batch.
360 fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Self::Output;
361}
362
363/// Represents a merge in progress.
364pub trait Merger<Output: Batch> {
365 /// Creates a new merger to merge the supplied batches, optionally compacting
366 /// up to the supplied frontier.
367 fn new(source1: &Output, source2: &Output, compaction_frontier: AntichainRef<Output::Time>) -> Self;
368 /// Perform some amount of work, decrementing `fuel`.
369 ///
370 /// If `fuel` is non-zero after the call, the merging is complete and
371 /// one should call `done` to extract the merged results.
372 fn work(&mut self, source1: &Output, source2: &Output, fuel: &mut isize);
373 /// Extracts merged results.
374 ///
375 /// This method should only be called after `work` has been called and
376 /// has not brought `fuel` to zero. Otherwise, the merge is still in
377 /// progress.
378 fn done(self) -> Output;
379}
380
381
382/// Blanket implementations for reference counted batches.
383pub mod rc_blanket_impls {
384
385 use std::rc::Rc;
386
387 use timely::progress::{Antichain, frontier::AntichainRef};
388 use super::{Batch, BatchReader, Builder, Merger, Cursor, Description};
389
390 impl<B: BatchReader> BatchReader for Rc<B> {
391 type Key<'a> = B::Key<'a>;
392 type KeyOwned = B::KeyOwned;
393 type Val<'a> = B::Val<'a>;
394 type ValOwned = B::ValOwned;
395 type Time = B::Time;
396 type Diff = B::Diff;
397
398 /// The type used to enumerate the batch's contents.
399 type Cursor = RcBatchCursor<B::Cursor>;
400 /// Acquires a cursor to the batch's contents.
401 fn cursor(&self) -> Self::Cursor {
402 RcBatchCursor::new((**self).cursor())
403 }
404
405 /// The number of updates in the batch.
406 fn len(&self) -> usize { (**self).len() }
407 /// Describes the times of the updates in the batch.
408 fn description(&self) -> &Description<Self::Time> { (**self).description() }
409 }
410
411 /// Wrapper to provide cursor to nested scope.
412 pub struct RcBatchCursor<C> {
413 cursor: C,
414 }
415
416 impl<C> RcBatchCursor<C> {
417 fn new(cursor: C) -> Self {
418 RcBatchCursor {
419 cursor,
420 }
421 }
422 }
423
424 impl<C: Cursor> Cursor for RcBatchCursor<C> {
425
426 type Key<'a> = C::Key<'a>;
427 type KeyOwned = C::KeyOwned;
428 type Val<'a> = C::Val<'a>;
429 type ValOwned = C::ValOwned;
430 type Time = C::Time;
431 type Diff = C::Diff;
432
433 type Storage = Rc<C::Storage>;
434
435 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
436 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
437
438 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
439 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
440
441 #[inline]
442 fn map_times<L: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Self::Storage, logic: L) {
443 self.cursor.map_times(storage, logic)
444 }
445
446 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
447 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
448
449 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
450 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
451
452 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
453 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
454 }
455
456 /// An immutable collection of updates.
457 impl<B: Batch> Batch for Rc<B> {
458 type Merger = RcMerger<B>;
459 }
460
461 /// Wrapper type for building reference counted batches.
462 pub struct RcBuilder<B: Builder> { builder: B }
463
464 /// Functionality for building batches from ordered update sequences.
465 impl<B: Builder> Builder for RcBuilder<B> {
466 type Item = B::Item;
467 type Time = B::Time;
468 type Output = Rc<B::Output>;
469 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } }
470 fn push(&mut self, element: Self::Item) { self.builder.push(element) }
471 fn copy(&mut self, element: &Self::Item) { self.builder.copy(element) }
472 fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Rc<B::Output> { Rc::new(self.builder.done(lower, upper, since)) }
473 }
474
475 /// Wrapper type for merging reference counted batches.
476 pub struct RcMerger<B:Batch> { merger: B::Merger }
477
478 /// Represents a merge in progress.
479 impl<B:Batch> Merger<Rc<B>> for RcMerger<B> {
480 fn new(source1: &Rc<B>, source2: &Rc<B>, compaction_frontier: AntichainRef<B::Time>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } }
481 fn work(&mut self, source1: &Rc<B>, source2: &Rc<B>, fuel: &mut isize) { self.merger.work(source1, source2, fuel) }
482 fn done(self) -> Rc<B> { Rc::new(self.merger.done()) }
483 }
484}
485
486
487/// Blanket implementations for reference counted batches.
488pub mod abomonated_blanket_impls {
489 use abomonation::{Abomonation, measure};
490 use abomonation::abomonated::Abomonated;
491 use timely::progress::{Antichain, frontier::AntichainRef};
492
493 use super::{Batch, BatchReader, Builder, Merger, Cursor, Description};
494
495 impl<B: BatchReader+Abomonation> BatchReader for Abomonated<B, Vec<u8>> {
496
497 type Key<'a> = B::Key<'a>;
498 type KeyOwned = B::KeyOwned;
499 type Val<'a> = B::Val<'a>;
500 type ValOwned = B::ValOwned;
501 type Time = B::Time;
502 type Diff = B::Diff;
503
504 /// The type used to enumerate the batch's contents.
505 type Cursor = AbomonatedBatchCursor<B::Cursor>;
506 /// Acquires a cursor to the batch's contents.
507 fn cursor(&self) -> Self::Cursor {
508 AbomonatedBatchCursor::new((**self).cursor())
509 }
510
511 /// The number of updates in the batch.
512 fn len(&self) -> usize { (**self).len() }
513 /// Describes the times of the updates in the batch.
514 fn description(&self) -> &Description<Self::Time> { (**self).description() }
515 }
516
517 /// Wrapper to provide cursor to nested scope.
518 pub struct AbomonatedBatchCursor<C> {
519 cursor: C,
520 }
521
522 impl<C> AbomonatedBatchCursor<C> {
523 fn new(cursor: C) -> Self {
524 AbomonatedBatchCursor {
525 cursor,
526 }
527 }
528 }
529
530 impl<C: Cursor> Cursor for AbomonatedBatchCursor<C> where C::Storage: Abomonation {
531
532 type Key<'a> = C::Key<'a>;
533 type KeyOwned = C::KeyOwned;
534 type Val<'a> = C::Val<'a>;
535 type ValOwned = C::ValOwned;
536 type Time = C::Time;
537 type Diff = C::Diff;
538
539 type Storage = Abomonated<C::Storage, Vec<u8>>;
540
541 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
542 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
543
544 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
545 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
546
547 #[inline]
548 fn map_times<L: FnMut(&Self::Time, &Self::Diff)>(&mut self, storage: &Self::Storage, logic: L) {
549 self.cursor.map_times(storage, logic)
550 }
551
552 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
553 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
554
555 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
556 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
557
558 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
559 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
560 }
561
562 /// An immutable collection of updates.
563 impl<B: Batch+Abomonation> Batch for Abomonated<B, Vec<u8>> {
564 type Merger = AbomonatedMerger<B>;
565 }
566
567 /// Wrapper type for building reference counted batches.
568 pub struct AbomonatedBuilder<B: Builder> { builder: B }
569
570 /// Functionality for building batches from ordered update sequences.
571 impl<B: Builder> Builder for AbomonatedBuilder<B>
572 where
573 B::Output: Abomonation,
574 {
575 type Item = B::Item;
576 type Time = B::Time;
577 type Output = Abomonated<B::Output, Vec<u8>>;
578 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(keys, vals, upds) } }
579 fn push(&mut self, element: Self::Item) { self.builder.push(element) }
580 fn copy(&mut self, element: &Self::Item) { self.builder.copy(element) }
581 fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Self::Output {
582 let batch = self.builder.done(lower, upper, since);
583 let mut bytes = Vec::with_capacity(measure(&batch));
584 unsafe { abomonation::encode(&batch, &mut bytes).unwrap() };
585 unsafe { Abomonated::<B::Output,_>::new(bytes).unwrap() }
586 }
587 }
588
589 /// Wrapper type for merging reference counted batches.
590 pub struct AbomonatedMerger<B:Batch> { merger: B::Merger }
591
592 /// Represents a merge in progress.
593 impl<B:Batch+Abomonation> Merger<Abomonated<B,Vec<u8>>> for AbomonatedMerger<B> {
594 fn new(source1: &Abomonated<B,Vec<u8>>, source2: &Abomonated<B,Vec<u8>>, compaction_frontier: AntichainRef<B::Time>) -> Self {
595 AbomonatedMerger { merger: B::begin_merge(source1, source2, compaction_frontier) }
596 }
597 fn work(&mut self, source1: &Abomonated<B,Vec<u8>>, source2: &Abomonated<B,Vec<u8>>, fuel: &mut isize) {
598 self.merger.work(source1, source2, fuel)
599 }
600 fn done(self) -> Abomonated<B, Vec<u8>> {
601 let batch = self.merger.done();
602 let mut bytes = Vec::with_capacity(measure(&batch));
603 unsafe { abomonation::encode(&batch, &mut bytes).unwrap() };
604 unsafe { Abomonated::<B,_>::new(bytes).unwrap() }
605 }
606 }
607}