Skip to main content

dbsp/
trace.rs

1//! # Traces
2//!
3//! A "trace" describes how a collection of key-value pairs changes over time.
4//! A "batch" is a mostly immutable trace.  This module provides traits and
5//! structures for expressing traces in DBSP as collections of `(key, val, time,
6//! diff)` tuples.
7//!
8//! The base trait for a trace is [`BatchReader`], which allows a trace to be
9//! read in sorted order by key and value.  `BatchReader` provides [`Cursor`] to
10//! step through a batch's tuples without modifying them.
11//!
12//! The [`Batch`] trait extends [`BatchReader`] with types and methods for
13//! creating new traces from ordered tuples ([`Batch::Builder`]) or unordered
14//! tuples ([`Batch::Batcher`]), or by merging traces of like types.
15//!
16//! The [`Trace`] trait, which also extends [`BatchReader`], adds methods to
17//! append new batches.  New tuples must not have times earlier than any of the
18//! tuples already in the trace.
19//!
20//! # Time within traces
21//!
22//! See the [time](crate::time) module documentation for a description of
23//! logical times.
24//!
25//! Traces are sorted by key and value.  They are not sorted with respect to
26//! time: reading a trace might obtain out of order and duplicate times among
27//! the `(time, diff)` pairs associated with a key and value.
28
29use crate::circuit::metadata::OperatorMeta;
30use crate::dynamic::{ClonableTrait, DynDataTyped, DynUnit, Weight};
31use crate::storage::buffer_cache::CacheStats;
32use crate::storage::file::SerializerInner;
33use crate::storage::file::TouchedWindowCount;
34pub use crate::storage::file::{DbspSerializer, Deserializable, Deserializer, Rkyv};
35use crate::storage::file::{FilterKind, FilterStats};
36use crate::trace::cursor::{
37    DefaultPushCursor, FilteredMergeCursor, FilteredMergeCursorWithSnapshot, PushCursor,
38    UnfilteredMergeCursor,
39};
40use crate::utils::{IsNone, SupportsRoaring};
41use crate::{dynamic::ArchivedDBData, storage::buffer_cache::FBuf};
42use cursor::CursorFactory;
43use enum_map::Enum;
44use feldera_storage::fbuf::FBufSerializer;
45use feldera_storage::{FileCommitter, FileReader, StoragePath};
46use rand::{Rng, thread_rng};
47use rkyv::ser::Serializer as _;
48use size_of::SizeOf;
49use std::any::TypeId;
50use std::future::Future;
51use std::sync::Arc;
52use std::{fmt::Debug, hash::Hash};
53
54pub mod cursor;
55pub mod filter;
56pub mod layers;
57pub mod ord;
58mod sampling;
59pub mod spine_async;
60pub(crate) use sampling::sample_keys_from_batches;
61pub use spine_async::{BatchReaderWithSnapshot, ListMerger, Spine, SpineSnapshot, WithSnapshot};
62
63#[cfg(test)]
64pub mod test;
65
66pub use ord::{
67    FallbackIndexedWSet, FallbackIndexedWSetBuilder, FallbackIndexedWSetFactories,
68    FallbackKeyBatch, FallbackKeyBatchFactories, FallbackValBatch, FallbackValBatchFactories,
69    FallbackWSet, FallbackWSetBuilder, FallbackWSetFactories, FileIndexedWSet,
70    FileIndexedWSetFactories, FileKeyBatch, FileKeyBatchFactories, FileValBatch,
71    FileValBatchFactories, FileWSet, FileWSetFactories, OrdIndexedWSet, OrdIndexedWSetBuilder,
72    OrdIndexedWSetFactories, OrdKeyBatch, OrdKeyBatchFactories, OrdValBatch, OrdValBatchFactories,
73    OrdWSet, OrdWSetBuilder, OrdWSetFactories, VecIndexedWSet, VecIndexedWSetFactories,
74    VecKeyBatch, VecKeyBatchFactories, VecValBatch, VecValBatchFactories, VecWSet,
75    VecWSetFactories,
76};
77
78use rkyv::bytecheck;
79use rkyv::{Deserialize, archived_root};
80
81use crate::{
82    Error, NumEntries, Timestamp,
83    algebra::MonoidValue,
84    dynamic::{DataTrait, DynPair, DynVec, DynWeightedPairs, Erase, Factory, WeightTrait},
85    storage::file::reader::Error as ReaderError,
86};
87pub use cursor::{Cursor, MergeCursor};
88pub use filter::{BatchFilterStats, BatchFilters, Filter, GroupFilter};
89pub use layers::Trie;
90
91/// Trait for data stored in batches.
92///
93/// This trait is used as a bound on `BatchReader::Key` and `BatchReader::Val`
94/// associated types (see [`trait BatchReader`]).  Hence when writing code that
95/// must be generic over any relational data, it is sufficient to impose
96/// `DBData` as a trait bound on types.  Conversely, a trait bound of the form
97/// `B: BatchReader` implies `B::Key: DBData` and `B::Val: DBData`.
98pub trait DBData:
99    Default
100    + Clone
101    + Eq
102    + Ord
103    + Hash
104    + SizeOf
105    + Send
106    + Sync
107    + Debug
108    + ArchivedDBData
109    + IsNone<Inner: ArchivedDBData>
110    + SupportsRoaring
111    + 'static
112{
113}
114
115/// Automatically implement DBData for everything that satisfied the bounds.
116impl<T> DBData for T where
117    T: Default
118        + Clone
119        + Eq
120        + Ord
121        + Hash
122        + SizeOf
123        + Send
124        + Sync
125        + Debug
126        + ArchivedDBData
127        + IsNone<Inner: ArchivedDBData>
128        + SupportsRoaring
129        + 'static
130{
131}
132
133/// A spine that is serialized to a file.
134#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive)]
135#[archive_attr(derive(rkyv::CheckBytes))]
136pub(crate) struct CommittedSpine {
137    pub batches: Vec<String>,
138    pub merged: Vec<(String, String)>,
139    pub effort: u64,
140    pub dirty: bool,
141}
142
143/// Deserializes `bytes` as type `T` using `rkyv`, tolerating `bytes` being
144/// misaligned.
145pub fn unaligned_deserialize<T: Deserializable>(bytes: &[u8]) -> T {
146    let mut aligned_bytes = FBuf::new();
147    aligned_bytes.extend_from_slice(bytes);
148    aligned_deserialize(&aligned_bytes)
149}
150
151/// Deserializes `bytes` as type `T` using `rkyv`.  `bytes` must be properly
152/// aligned.
153pub fn aligned_deserialize<T: Deserializable>(bytes: &[u8]) -> T {
154    unsafe { archived_root::<T>(bytes) }
155        .deserialize(&mut Deserializer::default())
156        .unwrap()
157}
158
159/// Trait for data types used as weights.
160///
161/// A type used for weights in a batch (i.e., as `BatchReader::R`) must behave
162/// as a monoid, i.e., a set with an associative `+` operation and a neutral
163/// element (zero).
164///
165/// Some applications use a weight as a ring, that is, require it to support
166/// multiplication too.
167///
168/// Finally, some applications require it to have `<` and `>` operations, in
169/// particular to distinguish whether something is an insertion or deletion.
170///
171/// Signed integer types such as `i32` and `i64` are suitable as weights,
172/// although if there is overflow then the results will be wrong.
173///
174/// When writing code generic over any weight type, it is sufficient to impose
175/// `DBWeight` as a trait bound on types.  Conversely, a trait bound of the form
176/// `B: BatchReader` implies `B::R: DBWeight`.
177pub trait DBWeight: DBData + MonoidValue {}
178impl<T> DBWeight for T where T: DBData + MonoidValue {}
179
180pub trait BatchReaderFactories<
181    K: DataTrait + ?Sized,
182    V: DataTrait + ?Sized,
183    T,
184    R: WeightTrait + ?Sized,
185>: Clone + Send + Sync
186{
187    // type BatchItemVTable: BatchItemTypeDescr<Key = K, Val = V, Item = I, R = R>;
188    fn new<KType, VType, RType>() -> Self
189    where
190        KType: DBData + Erase<K>,
191        VType: DBData + Erase<V>,
192        RType: DBWeight + Erase<R>;
193
194    fn key_factory(&self) -> &'static dyn Factory<K>;
195    fn keys_factory(&self) -> &'static dyn Factory<DynVec<K>>;
196    fn val_factory(&self) -> &'static dyn Factory<V>;
197    fn weight_factory(&self) -> &'static dyn Factory<R>;
198}
199
200// TODO: use Tuple3 instead
201pub type WeightedItem<K, V, R> = DynPair<DynPair<K, V>, R>;
202
203pub trait BatchFactories<K: DataTrait + ?Sized, V: DataTrait + ?Sized, T, R: WeightTrait + ?Sized>:
204    BatchReaderFactories<K, V, T, R>
205{
206    fn item_factory(&self) -> &'static dyn Factory<DynPair<K, V>>;
207
208    fn weighted_items_factory(&self) -> &'static dyn Factory<DynWeightedPairs<DynPair<K, V>, R>>;
209    fn weighted_vals_factory(&self) -> &'static dyn Factory<DynWeightedPairs<V, R>>;
210    fn weighted_item_factory(&self) -> &'static dyn Factory<WeightedItem<K, V, R>>;
211
212    /// Factory for a vector of (T, R) or `None` if `T` is `()`.
213    fn time_diffs_factory(
214        &self,
215    ) -> Option<&'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>>;
216}
217
218/// A set of `(key, val, time, diff)` tuples that can be read and extended.
219///
220/// `Trace` extends [`BatchReader`], most notably with [`insert`][Self::insert]
221/// for adding new batches of tuples.
222///
223/// See [crate documentation](crate::trace) for more information on batches and
224/// traces.
225pub trait Trace: BatchReader {
226    /// The type of an immutable collection of updates.
227    type Batch: Batch<
228            Key = Self::Key,
229            Val = Self::Val,
230            Time = Self::Time,
231            R = Self::R,
232            Factories = Self::Factories,
233        >;
234
235    /// Allocates a new empty trace.
236    fn new(factories: &Self::Factories) -> Self;
237
238    /// Sets a compaction frontier, i.e., a timestamp such that timestamps
239    /// below the frontier are indistinguishable to DBSP, therefore any `ts`
240    /// in the trace can be safely replaced with `ts.join(frontier)` without
241    /// affecting the output of the circuit.  By applying this replacement,
242    /// updates to the same (key, value) pairs applied during different steps
243    /// can be merged or discarded.
244    ///
245    /// The compaction is performed lazily at merge time.
246    fn set_frontier(&mut self, frontier: &Self::Time);
247
248    /// Exert merge effort, even without updates.
249    fn exert(&mut self, effort: &mut isize);
250
251    /// Merge all updates in a trace into a single batch.
252    fn consolidate(self) -> Option<Self::Batch>;
253
254    /// Introduces a batch of updates to the trace.
255    ///
256    /// If the trace has too many unmerged batches, this method will block
257    /// (asynchronously) until some of them have been merged.
258    fn insert(&mut self, batch: impl Into<Arc<Self::Batch>>) -> impl Future<Output = ()>;
259
260    /// Clears the value of the "dirty" flag to `false`.
261    ///
262    /// The "dirty" flag is used to efficiently track changes to the trace,
263    /// e.g., as part of checking whether a circuit has reached a fixed point.
264    /// Pushing a non-empty batch to the trace sets the flag to `true`. The
265    /// [`Self::dirty`] method returns true iff the trace has changed since the
266    /// last call to `clear_dirty_flag`.
267    fn clear_dirty_flag(&mut self);
268
269    /// Returns the value of the dirty flag.
270    fn dirty(&self) -> bool;
271
272    /// Informs the trace that keys that don't pass the filter are no longer
273    /// used and can be removed from the trace.
274    ///
275    /// The implementation is not required to remove truncated keys instantly
276    /// or at all.  This method is just a hint that keys that don't pass the
277    /// filter are no longer of interest to the consumer of the trace and
278    /// can be garbage collected.
279    ///
280    /// # Rationale
281    ///
282    /// This API is similar to the old API `BatchReader::truncate_keys_below`,
283    /// but in [Trace] instead of [BatchReader].  The difference is that a batch
284    /// can truncate its keys instanly by simply moving an internal pointer to
285    /// the first remaining key.  However, there is no similar way to retain
286    /// keys based on arbitrary predicates, this can only be done efficiently as
287    /// part of trace maintenance when either merging or compacting batches.
288    fn retain_keys(&mut self, filter: Filter<Self::Key>);
289
290    /// Informs the trace that values that don't pass the filter are no longer
291    /// used and can be removed from the trace.
292    ///
293    /// The implementation is not required to remove truncated values instantly
294    /// or at all.  This method is just a hint that values that don't pass the
295    /// filter are no longer of interest to the consumer of the trace and
296    /// can be garbage collected.
297    fn retain_values(&mut self, filter: GroupFilter<Self::Val>);
298
299    fn key_filter(&self) -> &Option<Filter<Self::Key>>;
300    fn value_filter(&self) -> &Option<GroupFilter<Self::Val>>;
301
302    /// Writes this trace to storage beneath `base`, using `pid` as a file name
303    /// prefix.  Adds the files that were written to `files` so that they can be
304    /// committed later.
305    fn save(
306        &mut self,
307        base: &StoragePath,
308        pid: &str,
309        files: &mut Vec<Arc<dyn FileCommitter>>,
310    ) -> Result<(), Error>;
311
312    /// Reads this trace back from storage under `base` with `pid` as the
313    /// prefix.
314    fn restore(&mut self, base: &StoragePath, pid: &str) -> Result<(), Error>;
315
316    /// Allows the trace to report additional metadata.
317    fn metadata(&self, _meta: &mut OperatorMeta) {}
318
319    fn initiate_compaction(&self);
320}
321
322/// Where a batch is stored.
323#[derive(Copy, Clone, Debug, PartialEq, Eq, Enum)]
324pub enum BatchLocation {
325    /// In RAM.
326    Memory,
327
328    /// On disk.
329    Storage,
330}
331
332// impl BatchLocation {
333//     fn as_str(&self) -> &'static str {
334//         match self {
335//             Self::Memory => "memory",
336//             Self::Storage => "storage",
337//         }
338//     }
339// }
340
341/// A set of `(key, value, time, diff)` tuples whose contents may be read in
342/// order by key and value.
343///
344/// A `BatchReader` is a mostly read-only interface.  This is especially useful
345/// for views derived from other sources in ways that prevent the construction
346/// of batches from the type of data in the view (for example, filtered views,
347/// or views with extended time coordinates).
348///
349/// See [crate documentation](crate::trace) for more information on batches and
350/// traces.
351///
352/// # Object safety
353///
354/// `BatchReader` is not object safe (it cannot be used as `dyn BatchReader`),
355/// but [Cursor] is, which can often be a useful substitute.
356pub trait BatchReader: Debug + NumEntries + Rkyv + SizeOf + 'static
357where
358    Self: Sized,
359{
360    type Factories: BatchFactories<Self::Key, Self::Val, Self::Time, Self::R>;
361
362    /// Key by which updates are indexed.
363    type Key: DataTrait + ?Sized;
364
365    /// Values associated with keys.
366    type Val: DataTrait + ?Sized;
367
368    /// Timestamps associated with updates
369    type Time: Timestamp;
370
371    /// Associated update.
372    type R: WeightTrait + ?Sized;
373
374    /// The type used to enumerate the batch's contents.
375    type Cursor<'s>: Cursor<Self::Key, Self::Val, Self::Time, Self::R> + Clone + Send
376    where
377        Self: 's;
378
379    // type Consumer: Consumer<Self::Key, Self::Val, Self::R, Self::Time>;
380
381    fn factories(&self) -> Self::Factories;
382
383    /// Acquires a cursor to the batch's contents.
384    fn cursor(&self) -> Self::Cursor<'_>;
385
386    /// Acquires a [PushCursor] for the batch's contents.
387    fn push_cursor(
388        &self,
389    ) -> Box<dyn PushCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
390        Box::new(DefaultPushCursor::new(self.cursor()))
391    }
392
393    /// Acquires a [MergeCursor] for the batch's contents.
394    fn merge_cursor(
395        &self,
396        key_filter: Option<Filter<Self::Key>>,
397        value_filter: Option<GroupFilter<Self::Val>>,
398    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
399        if key_filter.is_none() && value_filter.is_none() {
400            Box::new(UnfilteredMergeCursor::new(self.cursor()))
401        } else if let Some(GroupFilter::Simple(filter)) = value_filter {
402            Box::new(FilteredMergeCursor::new(
403                self.cursor(),
404                key_filter,
405                Some(filter),
406            ))
407        } else {
408            // Other forms of GroupFilters cannot be evaluated without a trace snapshot -- don't filter values
409            // in such cursors.
410            Box::new(FilteredMergeCursor::new(self.cursor(), key_filter, None))
411        }
412    }
413
414    /// Similar to `merge_cursor`, but invoked in the context of a spine merger.
415    /// Takes the current spine snapshot as an extra argument and uses it to evaluate `value_filter` precisely.
416    fn merge_cursor_with_snapshot<'a, S>(
417        &'a self,
418        key_filter: Option<Filter<Self::Key>>,
419        value_filter: Option<GroupFilter<Self::Val>>,
420        snapshot: &'a Option<Arc<S>>,
421    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + 'a>
422    where
423        S: BatchReader<Key = Self::Key, Val = Self::Val, Time = Self::Time, R = Self::R>,
424    {
425        let Some(snapshot) = snapshot else {
426            return self.merge_cursor(key_filter, value_filter);
427        };
428        if key_filter.is_none() && value_filter.is_none() {
429            Box::new(UnfilteredMergeCursor::new(self.cursor()))
430        } else if value_filter.is_none() {
431            Box::new(FilteredMergeCursor::new(self.cursor(), key_filter, None))
432        } else if let Some(GroupFilter::Simple(filter)) = value_filter {
433            Box::new(FilteredMergeCursor::new(
434                self.cursor(),
435                key_filter,
436                Some(filter),
437            ))
438        } else {
439            Box::new(FilteredMergeCursorWithSnapshot::new(
440                self.cursor(),
441                key_filter,
442                value_filter.unwrap(),
443                snapshot,
444            ))
445        }
446    }
447
448    /// Acquires a merge cursor for the batch's contents.
449    fn consuming_cursor(
450        &mut self,
451        key_filter: Option<Filter<Self::Key>>,
452        value_filter: Option<GroupFilter<Self::Val>>,
453    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
454        self.merge_cursor(key_filter, value_filter)
455    }
456    //fn consumer(self) -> Self::Consumer;
457
458    /// The number of keys in the batch.
459    // TODO: return `(usize, Option<usize>)`, similar to
460    // `Iterator::size_hint`, since not all implementations
461    // can compute the number of keys precisely.  Same for
462    // `len()`.
463    fn key_count(&self) -> usize;
464
465    /// The number of updates in the batch.
466    fn len(&self) -> usize;
467
468    /// The memory or storage size of the batch in bytes.
469    ///
470    /// This can be an approximation, such as the size of an on-disk file for a
471    /// stored batch.
472    ///
473    /// Implementations of this function can be expensive because they might
474    /// require iterating through all the data in a batch.  Currently this is
475    /// only used to decide whether to keep the result of a merge in memory or
476    /// on storage.  For this case, the merge will visit and copy all the data
477    /// in the batch. The batch will be discarded afterward, which means that
478    /// the implementation need not attempt to cache the return value.
479    fn approximate_byte_size(&self) -> usize;
480
481    /// Statistics of the secondary membership filter used by
482    /// [Cursor::seek_key_exact] after the range filter.
483    ///
484    /// Today this is usually a Bloom filter. Batches without such a filter
485    /// should return zero/default stats.
486    fn membership_filter_stats(&self) -> FilterStats {
487        FilterStats::default()
488    }
489
490    /// Filter kind for the secondary membership filter used by
491    /// [Cursor::seek_key_exact].
492    fn membership_filter_kind(&self) -> FilterKind {
493        FilterKind::None
494    }
495
496    /// Statistics of the in-memory range filter used by
497    /// [Cursor::seek_key_exact].
498    ///
499    /// Returns range-filter stats. Batches without a range filter should
500    /// return zeroed range stats.
501    fn range_filter_stats(&self) -> FilterStats {
502        FilterStats::default()
503    }
504
505    /// Where the batch's data is stored.
506    fn location(&self) -> BatchLocation {
507        BatchLocation::Memory
508    }
509
510    /// Storage cache access statistics for this batch only.
511    ///
512    /// Most batches are in-memory, so they don't have any statistics.
513    fn cache_stats(&self) -> CacheStats {
514        CacheStats::default()
515    }
516
517    /// True if the batch is empty.
518    fn is_empty(&self) -> bool {
519        self.len() == 0
520    }
521
522    /// Returns a uniform random sample of distincts keys from the batch.
523    ///
524    /// Does not take into account the number values associated with each
525    /// key and their weights, i.e., a key that has few values is as likely
526    /// to appear in the output sample as a key with many values.
527    ///
528    /// # Arguments
529    ///
530    /// * `rng` - random number generator used to generate the sample.
531    ///
532    /// * `sample_size` - requested sample size.
533    ///
534    /// * `sample` - output
535    ///
536    /// # Invariants
537    ///
538    /// The actual sample computed by the method can be smaller than
539    /// `sample_size` even if `self` contains `>sample_size` keys.
540    ///
541    /// A correct implementation must enforce the following invariants:
542    ///
543    /// * The output sample size cannot exceed `sample_size`.
544    ///
545    /// * The output sample can only contain keys present in `self` (with
546    ///   non-zero weights).
547    ///
548    /// * If `sample_size` is greater than or equal to the number of keys
549    ///   present in `self` (with non-zero weights), the resulting sample must
550    ///   contain all such keys.
551    ///
552    /// * The output sample contains keys sorted in ascending order.
553    fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
554    where
555        RG: Rng;
556
557    /// Returns num_partitions-1 keys from the batch that partition the batch into num_partitions
558    /// approximately equal size ranges 0..key1, key1..key2, ... , key_num_partitions-1..last_key_in_the_batch.
559    ///
560    /// The default implementation uses the sample_keys method to sample num_partitions^2 keys and
561    /// picks keys num_partitions, 2*num_partitions, ..,num_partitions-1*num_partitions as boundaries.
562    ///
563    /// # Arguments
564    ///
565    /// * `num_partitions` - number of partitions to create.
566    /// * `bounds` - output vector to store the partition boundaries.
567    fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<Self::Key>)
568    where
569        Self::Time: PartialEq<()>,
570    {
571        bounds.clear();
572        if num_partitions <= 1 {
573            return;
574        }
575
576        let sample_size = num_partitions * num_partitions;
577
578        let mut sample = self.factories().keys_factory().default_box();
579        self.sample_keys(&mut thread_rng(), sample_size, sample.as_mut());
580
581        // Pick evenly distributed keys as boundaries
582        let sample_len = sample.len();
583        if sample_len == 0 {
584            return;
585        }
586
587        if sample_len >= num_partitions {
588            // Pick num_bounds evenly distributed indices from the sample
589            // These divide the sample into num_bounds + 1 roughly equal parts
590            for i in 0..num_partitions - 1 {
591                let idx = ((i + 1) * sample_len) / num_partitions;
592                let idx = idx.min(sample_len - 1);
593                bounds.push_ref(sample.index(idx));
594            }
595        } else {
596            // If we have fewer samples than needed, use what we have
597            for i in 0..sample_len {
598                bounds.push_ref(sample.index(i));
599            }
600        }
601    }
602
603    /// Creates and returns a new batch that is a subset of this one, containing
604    /// only the key-value pairs whose keys are in `keys`. May also return
605    /// `None`, the default implementation, if the batch doesn't want to
606    /// implement this method.  In particular, a batch for which access through
607    /// a cursor is fast should return `None` to avoid the expense of copying
608    /// data.
609    ///
610    /// # Rationale
611    ///
612    /// This method enables performance optimizations for the case where these
613    /// assumptions hold:
614    ///
615    /// 1. Individual [Batch]es flowing through a circuit are small enough to
616    ///    fit comfortably in memory.
617    ///
618    /// 2. [Trace]s accumulated over time as a circuit executes may become large
619    ///    enough that they must be maintained in external storage.
620    ///
621    /// If an operator needs to fetch all of the data from a `trace` that
622    /// corresponds to some set of `keys`, then, given these assumptions, doing
623    /// so one key at a time with a cursor will be slow because every key fetch
624    /// potentially incurs a round trip to the storage, with total latency O(n)
625    /// in the number of keys. This method gives the batch implementation the
626    /// opportunity to implement parallel fetch for `trace.fetch(key)`, with
627    /// total latency O(1) in the number of keys.
628    #[allow(async_fn_in_trait)]
629    async fn fetch<B>(
630        &self,
631        keys: &B,
632    ) -> Option<Box<dyn CursorFactory<Self::Key, Self::Val, Self::Time, Self::R>>>
633    where
634        B: BatchReader<Key = Self::Key, Time = ()>,
635    {
636        let _ = keys;
637        None
638    }
639
640    fn keys(&self) -> Option<&DynVec<Self::Key>> {
641        None
642    }
643}
644
645impl<B> BatchReader for Arc<B>
646where
647    B: BatchReader,
648{
649    type Factories = B::Factories;
650    type Key = B::Key;
651    type Val = B::Val;
652    type Time = B::Time;
653    type R = B::R;
654    type Cursor<'s> = B::Cursor<'s>;
655    fn factories(&self) -> Self::Factories {
656        (**self).factories()
657    }
658    fn cursor(&self) -> Self::Cursor<'_> {
659        (**self).cursor()
660    }
661    fn merge_cursor(
662        &self,
663        key_filter: Option<Filter<Self::Key>>,
664        value_filter: Option<GroupFilter<Self::Val>>,
665    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
666        (**self).merge_cursor(key_filter, value_filter)
667    }
668    fn key_count(&self) -> usize {
669        (**self).key_count()
670    }
671    fn len(&self) -> usize {
672        (**self).len()
673    }
674    fn approximate_byte_size(&self) -> usize {
675        (**self).approximate_byte_size()
676    }
677    fn membership_filter_stats(&self) -> FilterStats {
678        (**self).membership_filter_stats()
679    }
680    fn membership_filter_kind(&self) -> FilterKind {
681        (**self).membership_filter_kind()
682    }
683    fn range_filter_stats(&self) -> FilterStats {
684        (**self).range_filter_stats()
685    }
686    fn location(&self) -> BatchLocation {
687        (**self).location()
688    }
689    fn cache_stats(&self) -> CacheStats {
690        (**self).cache_stats()
691    }
692    fn is_empty(&self) -> bool {
693        (**self).is_empty()
694    }
695    fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
696    where
697        RG: Rng,
698    {
699        (**self).sample_keys(rng, sample_size, sample)
700    }
701    fn consuming_cursor(
702        &mut self,
703        key_filter: Option<Filter<Self::Key>>,
704        value_filter: Option<GroupFilter<Self::Val>>,
705    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
706        (**self).merge_cursor(key_filter, value_filter)
707    }
708    async fn fetch<KB>(
709        &self,
710        keys: &KB,
711    ) -> Option<Box<dyn CursorFactory<Self::Key, Self::Val, Self::Time, Self::R>>>
712    where
713        KB: BatchReader<Key = Self::Key, Time = ()>,
714    {
715        (**self).fetch(keys).await
716    }
717    fn keys(&self) -> Option<&DynVec<Self::Key>> {
718        (**self).keys()
719    }
720}
721
722/// A [`BatchReader`] plus features for constructing new batches.
723///
724/// [`Batch`] extends [`BatchReader`] with types for constructing new batches
725/// from ordered tuples ([`Self::Builder`]) or unordered tuples
726/// ([`Self::Batcher`]), or by merging traces of like types, plus some
727/// convenient methods for using those types.
728///
729/// See [crate documentation](crate::trace) for more information on batches and
730/// traces.
731pub trait Batch: BatchReader + Clone + Send + Sync
732where
733    Self: Sized,
734{
735    /// A batch type equivalent to `Self`, but with timestamp type `T` instead of `Self::Time`.
736    type Timed<T: Timestamp>: Batch<
737            Key = <Self as BatchReader>::Key,
738            Val = <Self as BatchReader>::Val,
739            Time = T,
740            R = <Self as BatchReader>::R,
741        >;
742
743    /// A type used to assemble batches from disordered updates.
744    type Batcher: Batcher<Self>;
745
746    /// A type used to assemble batches from ordered update sequences.
747    type Builder: Builder<Self>;
748
749    /// Assemble an unordered vector of weighted items into a batch.
750    #[allow(clippy::type_complexity)]
751    fn dyn_from_tuples(
752        factories: &Self::Factories,
753        time: Self::Time,
754        tuples: &mut Box<DynWeightedPairs<DynPair<Self::Key, Self::Val>, Self::R>>,
755    ) -> Self {
756        let mut batcher = Self::Batcher::new_batcher(factories, time);
757        batcher.push_batch(tuples);
758        batcher.seal()
759    }
760
761    /// Creates a new batch as a copy of `batch`, using `timestamp` for all of
762    /// the new batch's timestamps This is useful for adding a timestamp to a
763    /// batch, or for converting between different batch implementations
764    /// (e.g. writing an in-memory batch to disk).
765    ///
766    /// TODO: for adding a timestamp to a batch, this could be implemented more
767    /// efficiently by having a special batch type where all updates have the same
768    /// timestamp, as this is the only kind of batch that we ever create directly in
769    /// DBSP; batches with multiple timestamps are only created as a result of
770    /// merging.  The main complication is that we will need to extend the trace
771    /// implementation to work with batches of multiple types.  This shouldn't be
772    /// too hard and is on the todo list.
773    fn from_batch<BI>(batch: &BI, timestamp: &Self::Time, factories: &Self::Factories) -> Self
774    where
775        BI: BatchReader<Key = Self::Key, Val = Self::Val, Time = (), R = Self::R>,
776    {
777        // Source and destination types are usually the same in the top-level scope.
778        // Optimize for this case by simply cloning the source batch. If the batch is
779        // implemented as `Arc` internally, this is essentially zero cost.
780        if TypeId::of::<BI>() == TypeId::of::<Self>() {
781            unsafe { std::mem::transmute::<&BI, &Self>(batch).clone() }
782        } else {
783            Self::from_cursor(
784                batch.cursor(),
785                timestamp,
786                factories,
787                batch.key_count(),
788                batch.len(),
789            )
790        }
791    }
792
793    /// Like `from_batch`, but avoids cloning the batch if the output type is identical to the input type.
794    fn from_arc_batch<BI>(
795        batch: &Arc<BI>,
796        timestamp: &Self::Time,
797        factories: &Self::Factories,
798    ) -> Arc<Self>
799    where
800        BI: BatchReader<Key = Self::Key, Val = Self::Val, Time = (), R = Self::R>,
801    {
802        // Source and destination types are usually the same in the top-level scope.
803        // Optimize for this case by simply cloning the source batch. If the batch is
804        // implemented as `Arc` internally, this is essentially zero cost.
805        if TypeId::of::<BI>() == TypeId::of::<Self>() {
806            unsafe { std::mem::transmute::<&Arc<BI>, &Arc<Self>>(batch).clone() }
807        } else {
808            Arc::new(Self::from_cursor(
809                batch.cursor(),
810                timestamp,
811                factories,
812                batch.key_count(),
813                batch.len(),
814            ))
815        }
816    }
817
818    /// Creates a new batch as a copy of the tuples accessible via `cursor``,
819    /// using `timestamp` for all of the new batch's timestamps.
820    fn from_cursor<C>(
821        mut cursor: C,
822        timestamp: &Self::Time,
823        factories: &Self::Factories,
824        key_capacity: usize,
825        value_capacity: usize,
826    ) -> Self
827    where
828        C: Cursor<Self::Key, Self::Val, (), Self::R>,
829    {
830        let mut builder = Self::Builder::with_capacity(factories, key_capacity, value_capacity);
831        while cursor.key_valid() {
832            let mut any_values = false;
833            while cursor.val_valid() {
834                let weight = cursor.weight();
835                debug_assert!(!weight.is_zero());
836                builder.push_time_diff(timestamp, weight);
837                builder.push_val(cursor.val());
838                any_values = true;
839                cursor.step_val();
840            }
841            if any_values {
842                builder.push_key(cursor.key());
843            }
844            cursor.step_key();
845        }
846        builder.done()
847    }
848
849    /// Creates an empty batch.
850    fn dyn_empty(factories: &Self::Factories) -> Self {
851        Self::Builder::new_builder(factories).done()
852    }
853
854    /// Returns elements from `self` that satisfy a predicate.
855    fn filter(&self, predicate: &dyn Fn(&Self::Key, &Self::Val) -> bool) -> Self
856    where
857        Self::Time: PartialEq<()> + From<()>,
858    {
859        let factories = self.factories();
860        let mut builder = Self::Builder::new_builder(&factories);
861        let mut cursor = self.cursor();
862
863        while cursor.key_valid() {
864            let mut any_values = false;
865            while cursor.val_valid() {
866                if predicate(cursor.key(), cursor.val()) {
867                    builder.push_diff(cursor.weight());
868                    builder.push_val(cursor.val());
869                    any_values = true;
870                }
871                cursor.step_val();
872            }
873            if any_values {
874                builder.push_key(cursor.key());
875            }
876            cursor.step_key();
877        }
878
879        builder.done()
880    }
881
882    /// If this batch is not on storage, but supports writing itself to storage,
883    /// this method writes it to storage and returns the stored version.
884    fn persisted(&self) -> Option<Self> {
885        None
886    }
887
888    /// This functions returns the file that can be used to restore the batch's
889    /// contents.
890    ///
891    /// If the batch can not be persisted, this function returns None.
892    fn file_reader(&self) -> Option<Arc<dyn FileReader>> {
893        None
894    }
895
896    fn from_path(_factories: &Self::Factories, _path: &StoragePath) -> Result<Self, ReaderError> {
897        Err(ReaderError::Unsupported)
898    }
899
900    /// Minimum and maximum keys in this batch.
901    ///
902    /// File-backed batches materialize these bounds at write time. In-memory
903    /// batches compute them from their ordered key storage. Merge builders
904    /// use these bounds to decide whether a batch span can be encoded into a
905    /// roaring bitmap.
906    ///
907    /// Returns `None` for empty batches.
908    fn key_bounds(&self) -> Option<(&Self::Key, &Self::Key)>;
909
910    /// Exact number of 16-bit roaring windows touched by the batch, relative
911    /// to the batch minimum.
912    ///
913    /// File-backed batches materialize this count at write time. In-memory
914    /// batches track it while they are built. The merge-time filter predictor
915    /// uses it together with min/max span overlap to estimate how many roaring
916    /// containers the merged batch will likely touch.
917    ///
918    /// A value of `0` means the batch cannot provide an exact count, e.g. the
919    /// key type is not roaring-compatible, the batch span does not fit in
920    /// `u32`, or the batch is empty.
921    fn touched_window_count(&self) -> TouchedWindowCount;
922
923    /// The number of tuples with negative weights in the batch.
924    ///
925    /// This metric is used in merger heuristics. Negative weights are likely to cancel out
926    /// with positive weights when merging the batch with other batches in the spine; therefore the
927    /// merger will merge such batches more aggressively than it otherwise would based on the batch
928    /// size only.
929    ///
930    /// This heuristic is not useful for all batch types, in particular negative weights in batches
931    /// produced by recursive queries do not generally cancel out. For such batches we don't track
932    /// negative weights, and this method returns `None`.
933    fn negative_weight_count(&self) -> Option<u64>;
934}
935
936/// Functionality for collecting and batching updates.
937pub trait Batcher<Output>: SizeOf
938where
939    Output: Batch,
940{
941    /// Allocates a new empty batcher.  All tuples in the batcher (and its
942    /// output batch) will have timestamp `time`.
943    fn new_batcher(vtables: &Output::Factories, time: Output::Time) -> Self;
944
945    /// Adds an unordered batch of elements to the batcher.
946    fn push_batch(
947        &mut self,
948        batch: &mut Box<DynWeightedPairs<DynPair<Output::Key, Output::Val>, Output::R>>,
949    );
950
951    /// Adds a consolidated batch of elements to the batcher.
952    ///
953    /// A consolidated batch is sorted and contains no duplicates or zero
954    /// weights.
955    fn push_consolidated_batch(
956        &mut self,
957        batch: &mut Box<DynWeightedPairs<DynPair<Output::Key, Output::Val>, Output::R>>,
958    );
959
960    /// Returns the number of tuples in the batcher.
961    fn tuples(&self) -> usize;
962
963    /// Returns all updates not greater or equal to an element of `upper`.
964    fn seal(self) -> Output;
965}
966
967/// Functionality for building batches from ordered update sequences.
968///
969/// This interface requires the client to push all of the time-diff pairs
970/// associated with a value, then the value, then all the time-diff pairs
971/// associated with the next value, then that value, and so on. Once all of the
972/// values associated with the current key have been pushed, the client pushes
973/// the key.
974///
975/// If this interface is too low-level for the client, consider wrapping it in a
976/// [TupleBuilder].
977///
978/// # Example
979///
980/// To push the following tuples:
981///
982/// ```text
983/// (k1, v1, t1, r1)
984/// (k1, v1, t2, r2)
985/// (k1, v2, t1, r1)
986/// (k1, v3, t2, r2)
987/// (k2, v1, t1, r1)
988/// (k2, v1, t2, r2)
989/// (k3, v1, t1, r1)
990/// (k4, v2, t2, r2)
991/// ```
992///
993/// the client would use:
994///
995/// ```ignore
996/// builder.push_time_diff(t1, r1);
997/// builder.push_time_diff(t2, r2);
998/// builder.push_val(v1);
999/// builder.push_time_diff(t1, r1);
1000/// builder.push_val(v2);
1001/// builder.push_time_diff(t2, r2);
1002/// builder.push_val(v3);
1003/// builder.push_key(k1);
1004/// builder.push_time_diff(t1, r1);
1005/// builder.push_time_diff(t2, r2);
1006/// builder.push_val(v1);
1007/// builder.push_key(k2);
1008/// builder.push_time_diff(t1, r1);
1009/// builder.push_val(v1);
1010/// builder.push_key(k3);
1011/// builder.push_time_diff(t2, r2);
1012/// builder.push_val(v2);
1013/// builder.push_key(k4);
1014/// ```
1015pub trait Builder<Output>: Send + SizeOf
1016where
1017    Self: Sized,
1018    Output: Batch,
1019{
1020    /// Creates a new builder with an initial capacity of 0.
1021    fn new_builder(factories: &Output::Factories) -> Self {
1022        Self::with_capacity(factories, 0, 0)
1023    }
1024
1025    /// Creates an empty builder with estimated capacities for keys and
1026    /// key-value pairs.  Only `tuple_capacity >= key_capacity` makes sense but
1027    /// implementations must tolerate contradictory capacity requests.
1028    ///
1029    /// The caller may optionally specify a preferred `location`.  The builder
1030    /// should honor it if it can, but some builders only build in one specific
1031    /// location.
1032    fn with_capacity_in_location(
1033        factories: &Output::Factories,
1034        key_capacity: usize,
1035        value_capacity: usize,
1036        location: Option<BatchLocation>,
1037    ) -> Self;
1038
1039    /// Creates an empty builder with estimated capacities for keys and
1040    /// key-value pairs.  Only `tuple_capacity >= key_capacity` makes sense but
1041    /// implementations must tolerate contradictory capacity requests.
1042    fn with_capacity(
1043        factories: &Output::Factories,
1044        key_capacity: usize,
1045        value_capacity: usize,
1046    ) -> Self {
1047        Self::with_capacity_in_location(factories, key_capacity, value_capacity, None)
1048    }
1049
1050    /// Creates an empty builder to hold the result of merging
1051    /// `batches`. Optionally, `location` can specify the preferred location for
1052    /// the result of the merge.
1053    fn for_merge<'a, B, I>(
1054        factories: &Output::Factories,
1055        batches: I,
1056        location: Option<BatchLocation>,
1057    ) -> Self
1058    where
1059        B: Batch<Key = Output::Key, Val = Output::Val, Time = Output::Time, R = Output::R>,
1060        I: IntoIterator<Item = &'a B> + Clone,
1061    {
1062        let key_capacity = batches.clone().into_iter().map(|b| b.key_count()).sum();
1063        let value_capacity = batches.into_iter().map(|b| b.len()).sum();
1064        Self::with_capacity_in_location(factories, key_capacity, value_capacity, location)
1065    }
1066
1067    /// Adds time-diff pair `(time, weight)`.
1068    fn push_time_diff(&mut self, time: &Output::Time, weight: &Output::R);
1069
1070    /// Adds time-diff pair `(time, weight)`.
1071    fn push_time_diff_mut(&mut self, time: &mut Output::Time, weight: &mut Output::R) {
1072        self.push_time_diff(time, weight);
1073    }
1074
1075    /// Adds value `val`.
1076    fn push_val(&mut self, val: &Output::Val);
1077
1078    /// Adds value `val`.
1079    fn push_val_mut(&mut self, val: &mut Output::Val) {
1080        self.push_val(val);
1081    }
1082
1083    /// Adds key `key`.
1084    fn push_key(&mut self, key: &Output::Key);
1085
1086    /// Adds key `key`.
1087    fn push_key_mut(&mut self, key: &mut Output::Key) {
1088        self.push_key(key);
1089    }
1090
1091    /// Adds time-diff pair `(), weight`.
1092    fn push_diff(&mut self, weight: &Output::R)
1093    where
1094        Output::Time: PartialEq<()>,
1095    {
1096        self.push_time_diff(&Output::Time::default(), weight);
1097    }
1098
1099    /// Adds time-diff pair `(), weight`.
1100    fn push_diff_mut(&mut self, weight: &mut Output::R)
1101    where
1102        Output::Time: PartialEq<()>,
1103    {
1104        self.push_diff(weight);
1105    }
1106
1107    /// Adds time-diff pair `(), weight` and value `val`.
1108    fn push_val_diff(&mut self, val: &Output::Val, weight: &Output::R)
1109    where
1110        Output::Time: PartialEq<()>,
1111    {
1112        self.push_time_diff(&Output::Time::default(), weight);
1113        self.push_val(val);
1114    }
1115
1116    /// Adds time-diff pair `(), weight` and value `val`.
1117    fn push_val_diff_mut(&mut self, val: &mut Output::Val, weight: &mut Output::R)
1118    where
1119        Output::Time: PartialEq<()>,
1120    {
1121        self.push_val_diff(val, weight);
1122    }
1123
1124    /// Allocates room for `additional` keys.
1125    fn reserve(&mut self, additional: usize) {
1126        let _ = additional;
1127    }
1128
1129    fn num_keys(&self) -> usize;
1130    fn num_tuples(&self) -> usize;
1131
1132    /// Completes building and returns the batch.
1133    fn done(self) -> Output;
1134}
1135
1136/// Batch builder that accepts a full tuple at a time.
1137///
1138/// This wrapper for [Builder] allows a full tuple to be added at a time.
1139pub struct TupleBuilder<B, Output>
1140where
1141    B: Builder<Output>,
1142    Output: Batch,
1143{
1144    builder: B,
1145    kv: Box<DynPair<Output::Key, Output::Val>>,
1146    has_kv: bool,
1147}
1148
1149impl<B, Output> TupleBuilder<B, Output>
1150where
1151    B: Builder<Output>,
1152    Output: Batch,
1153{
1154    pub fn new(factories: &Output::Factories, builder: B) -> Self {
1155        Self {
1156            builder,
1157            kv: factories.item_factory().default_box(),
1158            has_kv: false,
1159        }
1160    }
1161
1162    pub fn num_keys(&self) -> usize {
1163        self.builder.num_keys()
1164    }
1165
1166    pub fn num_tuples(&self) -> usize {
1167        self.builder.num_tuples()
1168    }
1169
1170    /// Adds `element` to the batch.
1171    pub fn push(&mut self, element: &mut DynPair<DynPair<Output::Key, Output::Val>, Output::R>)
1172    where
1173        Output::Time: PartialEq<()>,
1174    {
1175        let (kv, w) = element.split_mut();
1176        let (k, v) = kv.split_mut();
1177        self.push_vals(k, v, &mut Output::Time::default(), w);
1178    }
1179
1180    /// Adds tuple `(key, val, time, weight)` to the batch.
1181    pub fn push_refs(
1182        &mut self,
1183        key: &Output::Key,
1184        val: &Output::Val,
1185        time: &Output::Time,
1186        weight: &Output::R,
1187    ) {
1188        if self.has_kv {
1189            let (k, v) = self.kv.split_mut();
1190            if k != key {
1191                self.builder.push_val_mut(v);
1192                self.builder.push_key_mut(k);
1193                self.kv.from_refs(key, val);
1194            } else if v != val {
1195                self.builder.push_val_mut(v);
1196                val.clone_to(v);
1197            }
1198        } else {
1199            self.has_kv = true;
1200            self.kv.from_refs(key, val);
1201        }
1202        self.builder.push_time_diff(time, weight);
1203    }
1204
1205    /// Adds tuple `(key, val, time, weight)` to the batch.
1206    pub fn push_vals(
1207        &mut self,
1208        key: &mut Output::Key,
1209        val: &mut Output::Val,
1210        time: &mut Output::Time,
1211        weight: &mut Output::R,
1212    ) {
1213        if self.has_kv {
1214            let (k, v) = self.kv.split_mut();
1215            if k != key {
1216                self.builder.push_val_mut(v);
1217                self.builder.push_key_mut(k);
1218                self.kv.from_vals(key, val);
1219            } else if v != val {
1220                self.builder.push_val_mut(v);
1221                val.move_to(v);
1222            }
1223        } else {
1224            self.has_kv = true;
1225            self.kv.from_vals(key, val);
1226        }
1227        self.builder.push_time_diff_mut(time, weight);
1228    }
1229
1230    pub fn reserve(&mut self, additional: usize) {
1231        self.builder.reserve(additional)
1232    }
1233
1234    /// Adds all of the tuples in `iter` to the batch.
1235    pub fn extend<'a, I>(&mut self, iter: I)
1236    where
1237        Output::Time: PartialEq<()>,
1238        I: Iterator<Item = &'a mut WeightedItem<Output::Key, Output::Val, Output::R>>,
1239    {
1240        let (lower, upper) = iter.size_hint();
1241        self.reserve(upper.unwrap_or(lower));
1242
1243        for item in iter {
1244            let (kv, w) = item.split_mut();
1245            let (k, v) = kv.split_mut();
1246
1247            self.push_vals(k, v, &mut Output::Time::default(), w);
1248        }
1249    }
1250
1251    /// Completes building and returns the batch.
1252    pub fn done(mut self) -> Output {
1253        if self.has_kv {
1254            let (k, v) = self.kv.split_mut();
1255            self.builder.push_val_mut(v);
1256            self.builder.push_key_mut(k);
1257        }
1258        self.builder.done()
1259    }
1260}
1261
1262/// Merges all of the batches in `batches`, applying `key_filter` and
1263/// `value_filter`, and returns the merged result.
1264///
1265/// The filters won't be applied to batches that don't get merged at all, that
1266/// is, if `batches` contains only one non-empty batch, or if it contains two
1267/// small batches that merge to become an empty batch alongside a third larger
1268/// batch, etc.
1269pub fn merge_batches<B, T>(
1270    factories: &B::Factories,
1271    batches: T,
1272    key_filter: &Option<Filter<B::Key>>,
1273    value_filter: &Option<GroupFilter<B::Val>>,
1274) -> B
1275where
1276    T: IntoIterator<Item = B>,
1277    B: Batch,
1278{
1279    // Collect input batches, discarding empty batches.
1280    let mut batches = batches
1281        .into_iter()
1282        .filter(|b| !b.is_empty())
1283        .collect::<Vec<_>>();
1284
1285    // Merge groups of up to 64 input batches to one output batch each.
1286    //
1287    // In practice, there are <= 64 input batches and 1 output batch (or 0 if
1288    // the inputs cancel each other out).
1289    while batches.len() > 1 {
1290        let mut inputs = batches.split_off(batches.len().saturating_sub(64));
1291        let result: B = ListMerger::merge(
1292            factories,
1293            B::Builder::for_merge(factories, &inputs, Some(BatchLocation::Memory)),
1294            inputs
1295                .iter_mut()
1296                .map(|b| b.consuming_cursor(key_filter.clone(), value_filter.clone()))
1297                .collect(),
1298        );
1299        if !result.is_empty() {
1300            batches.push(result);
1301        }
1302    }
1303
1304    // Take the final output batch, or synthesize an empty one if all the
1305    // batches added up to nothing.
1306    batches.pop().unwrap_or_else(|| B::dyn_empty(factories))
1307}
1308
1309/// Merges all of the batches in `batches`, applying `key_filter` and
1310/// `value_filter`, and returns the merged result.
1311///
1312/// Every tuple will be passed through the filters.
1313pub fn merge_batches_by_reference<'a, B, T>(
1314    factories: &B::Factories,
1315    batches: T,
1316    key_filter: &Option<Filter<B::Key>>,
1317    value_filter: &Option<GroupFilter<B::Val>>,
1318) -> B
1319where
1320    T: IntoIterator<Item = &'a B>,
1321    B: Batch,
1322{
1323    // Collect input batches, discarding empty batches.
1324    let mut batches = batches
1325        .into_iter()
1326        .filter(|b| !b.is_empty())
1327        .collect::<Vec<_>>();
1328
1329    // Merge groups of up to 64 input batches to one output batch each. This
1330    // also transforms `&B` in `batches` into `B` in `outputs`.
1331    //
1332    // In practice, there are <= 64 input batches and 1 output batch (or 0 if
1333    // the inputs cancel each other out).
1334    let mut outputs = Vec::with_capacity(batches.len().div_ceil(64));
1335    while !batches.is_empty() {
1336        let inputs = batches.split_off(batches.len().saturating_sub(64));
1337        let result: B = ListMerger::merge(
1338            factories,
1339            B::Builder::for_merge(
1340                factories,
1341                inputs.iter().cloned(),
1342                Some(BatchLocation::Memory),
1343            ),
1344            inputs
1345                .into_iter()
1346                .map(|b| b.merge_cursor(key_filter.clone(), value_filter.clone()))
1347                .collect(),
1348        );
1349        if !result.is_empty() {
1350            outputs.push(result);
1351        }
1352    }
1353
1354    // Merge the output batches (in practice, either 0 or 1 of them).
1355    merge_batches(factories, outputs, key_filter, value_filter)
1356}
1357
1358/// Compares two batches for equality.  This works regardless of whether the
1359/// batches are the same type, as long as their key, value, and weight types can
1360/// be compared for equality.
1361///
1362/// This can't be implemented as `PartialEq` because that is specialized for
1363/// comparing particular batch types (often in faster ways than this generic
1364/// function).  This function is mainly useful for testing in any case.
1365pub fn eq_batch<A, B, KA, VA, RA, KB, VB, RB>(a: &A, b: &B) -> bool
1366where
1367    A: BatchReader<Key = KA, Val = VA, Time = (), R = RA>,
1368    B: BatchReader<Key = KB, Val = VB, Time = (), R = RB>,
1369    KA: PartialEq<KB> + ?Sized,
1370    VA: PartialEq<VB> + ?Sized,
1371    RA: PartialEq<RB> + ?Sized,
1372    KB: ?Sized,
1373    VB: ?Sized,
1374    RB: ?Sized,
1375{
1376    let mut c1 = a.cursor();
1377    let mut c2 = b.cursor();
1378    while c1.key_valid() && c2.key_valid() {
1379        if c1.key() != c2.key() {
1380            return false;
1381        }
1382        while c1.val_valid() && c2.val_valid() {
1383            if c1.val() != c2.val() || c1.weight() != c2.weight() {
1384                return false;
1385            }
1386            c1.step_val();
1387            c2.step_val();
1388        }
1389        if c1.val_valid() || c2.val_valid() {
1390            return false;
1391        }
1392        c1.step_key();
1393        c2.step_key();
1394    }
1395    !c1.key_valid() && !c2.key_valid()
1396}
1397
1398fn serialize_wset<B, K, R>(batch: &B) -> Vec<u8>
1399where
1400    B: BatchReader<Key = K, Val = DynUnit, Time = (), R = R>,
1401    K: DataTrait + ?Sized,
1402    R: WeightTrait + ?Sized,
1403{
1404    SerializerInner::to_fbuf_with_thread_local(|s| {
1405        let mut offsets = Vec::with_capacity(2 * batch.len());
1406        let mut cursor = batch.cursor();
1407        while cursor.key_valid() {
1408            offsets.push(cursor.key().serialize(s)?);
1409            offsets.push(cursor.weight().serialize(s)?);
1410            cursor.step_key();
1411        }
1412        s.serialize_value(&offsets)
1413    })
1414    .into_vec()
1415}
1416
1417fn deserialize_wset<B, K, R>(factories: &B::Factories, data: &[u8]) -> B
1418where
1419    B: Batch<Key = K, Val = DynUnit, Time = (), R = R>,
1420    K: DataTrait + ?Sized,
1421    R: WeightTrait + ?Sized,
1422{
1423    let offsets = unsafe { archived_root::<Vec<usize>>(data) };
1424    assert!(offsets.len() % 2 == 0);
1425    let n = offsets.len() / 2;
1426    let mut builder = B::Builder::with_capacity(factories, n, n);
1427    let mut key = factories.key_factory().default_box();
1428    let mut diff = factories.weight_factory().default_box();
1429    for i in 0..n {
1430        unsafe { key.deserialize_from_bytes(data, offsets[i * 2] as usize) };
1431        unsafe { diff.deserialize_from_bytes(data, offsets[i * 2 + 1] as usize) };
1432        builder.push_val_diff(&(), &diff);
1433        builder.push_key(&key);
1434    }
1435    builder.done()
1436}
1437
1438/// Separator that identifies the end of values for a key.
1439const SEPARATOR: u64 = u64::MAX;
1440
1441#[cfg(debug_assertions)]
1442#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1443enum State {
1444    Key,
1445    Val,
1446    Diff,
1447}
1448
1449pub struct IndexedWSetSerializer {
1450    fbuf: FBuf,
1451    offsets: Vec<usize>,
1452    n_keys: usize,
1453    n_values: usize,
1454    #[cfg(debug_assertions)]
1455    state: State,
1456}
1457
1458impl IndexedWSetSerializer {
1459    pub fn with_capacity(estimated_keys: usize, estimated_values: usize) -> Self {
1460        let mut offsets = Vec::with_capacity(2 + 2 * estimated_keys + 2 * estimated_values);
1461        offsets.push(0);
1462        offsets.push(0);
1463        Self {
1464            fbuf: FBuf::new(),
1465            offsets,
1466            n_keys: 0,
1467            n_values: 0,
1468            #[cfg(debug_assertions)]
1469            state: State::Key,
1470        }
1471    }
1472
1473    pub fn push_diff<R: WeightTrait + ?Sized>(
1474        &mut self,
1475        weight: &R,
1476        serializer_inner: &mut SerializerInner,
1477    ) {
1478        #[cfg(debug_assertions)]
1479        {
1480            debug_assert_ne!(self.state, State::Diff);
1481            self.state = State::Diff;
1482        }
1483
1484        serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
1485            self.offsets.push(weight.serialize(s).unwrap())
1486        });
1487    }
1488
1489    pub fn push_val<V: DataTrait + ?Sized>(
1490        &mut self,
1491        val: &V,
1492        serializer_inner: &mut SerializerInner,
1493    ) {
1494        #[cfg(debug_assertions)]
1495        {
1496            debug_assert_eq!(self.state, State::Diff);
1497            self.state = State::Val;
1498        }
1499
1500        self.n_values += 1;
1501        serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
1502            self.offsets.push(val.serialize(s).unwrap())
1503        });
1504    }
1505
1506    pub fn push_key<K: DataTrait + ?Sized>(
1507        &mut self,
1508        key: &K,
1509        serializer_inner: &mut SerializerInner,
1510    ) {
1511        #[cfg(debug_assertions)]
1512        {
1513            debug_assert_eq!(self.state, State::Val);
1514            self.state = State::Key;
1515        }
1516
1517        self.offsets.push(SEPARATOR as usize);
1518        self.n_keys += 1;
1519        serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
1520            self.offsets.push(key.serialize(s).unwrap())
1521        });
1522    }
1523
1524    pub fn done(mut self, serializer_inner: &mut SerializerInner) -> FBuf {
1525        #[cfg(debug_assertions)]
1526        debug_assert_eq!(self.state, State::Key);
1527        self.offsets[0] = self.n_keys;
1528        self.offsets[1] = self.n_values;
1529        serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
1530            s.serialize_value(&self.offsets).unwrap()
1531        });
1532        self.fbuf
1533    }
1534}
1535
1536pub fn serialize_indexed_wset<B, K, V, R>(batch: &B, serializer_inner: &mut SerializerInner) -> FBuf
1537where
1538    B: BatchReader<Key = K, Val = V, Time = (), R = R>,
1539    K: DataTrait + ?Sized,
1540    V: DataTrait + ?Sized,
1541    R: WeightTrait + ?Sized,
1542{
1543    let mut serializer = IndexedWSetSerializer::with_capacity(batch.key_count(), batch.len());
1544    let mut cursor = batch.cursor();
1545
1546    while cursor.key_valid() {
1547        while cursor.val_valid() {
1548            serializer.push_diff(cursor.weight(), serializer_inner);
1549            serializer.push_val(cursor.val(), serializer_inner);
1550            cursor.step_val();
1551        }
1552        serializer.push_key(cursor.key(), serializer_inner);
1553        cursor.step_key();
1554    }
1555    serializer.done(serializer_inner)
1556}
1557
1558pub fn deserialize_indexed_wset<B, K, V, R>(factories: &B::Factories, data: &[u8]) -> B
1559where
1560    B: Batch<Key = K, Val = V, Time = (), R = R>,
1561    K: DataTrait + ?Sized,
1562    V: DataTrait + ?Sized,
1563    R: WeightTrait + ?Sized,
1564{
1565    let offsets = unsafe { archived_root::<Vec<usize>>(data) };
1566    let n_keys = offsets[0] as usize;
1567    let n_values = offsets[1] as usize;
1568
1569    let mut builder = B::Builder::with_capacity(factories, n_keys, n_values);
1570    let mut key = factories.key_factory().default_box();
1571    let mut val = factories.val_factory().default_box();
1572    let mut diff = factories.weight_factory().default_box();
1573
1574    let mut current_offset = 2;
1575
1576    while current_offset < offsets.len() {
1577        while offsets[current_offset] != SEPARATOR {
1578            unsafe { diff.deserialize_from_bytes(data, offsets[current_offset] as usize) };
1579            current_offset += 1;
1580            unsafe { val.deserialize_from_bytes(data, offsets[current_offset] as usize) };
1581            current_offset += 1;
1582
1583            builder.push_val_diff(&val, &diff);
1584        }
1585        current_offset += 1;
1586
1587        unsafe { key.deserialize_from_bytes(data, offsets[current_offset] as usize) };
1588        current_offset += 1;
1589
1590        builder.push_key(&key);
1591    }
1592    builder.done()
1593}
1594
1595#[cfg(test)]
1596mod serialize_test {
1597    use crate::{
1598        DynZWeight, OrdIndexedZSet,
1599        algebra::OrdIndexedZSet as DynOrdIndexedZSet,
1600        dynamic::DynData,
1601        indexed_zset,
1602        storage::file::SerializerInner,
1603        trace::{BatchReader, deserialize_indexed_wset, serialize_indexed_wset},
1604    };
1605
1606    #[test]
1607    fn test_serialize_indexed_wset() {
1608        let test1: OrdIndexedZSet<u64, u64> = indexed_zset! {};
1609        let test2 = indexed_zset! { 1 => { 1 => 1 } };
1610        let test3 =
1611            indexed_zset! { 1 => { 1 => 1, 2 => 2, 3 => 3 }, 2 => { 1 => 1, 2 => 2, 3 => 3 } };
1612
1613        for test in [test1, test2, test3] {
1614            let serialized = serialize_indexed_wset(&*test, &mut SerializerInner::new());
1615            let deserialized = deserialize_indexed_wset::<
1616                DynOrdIndexedZSet<DynData, DynData>,
1617                DynData,
1618                DynData,
1619                DynZWeight,
1620            >(&test.factories(), &serialized);
1621
1622            assert_eq!(&*test, &deserialized);
1623        }
1624    }
1625
1626    #[test]
1627    fn test_serialize_indexed_wset_tup0_key() {
1628        let test1: OrdIndexedZSet<(), u64> = indexed_zset! {};
1629        let test2 = indexed_zset! { () => { 1 => 1 } };
1630
1631        for test in [test1, test2] {
1632            let serialized = serialize_indexed_wset(&*test, &mut SerializerInner::new());
1633            let deserialized = deserialize_indexed_wset::<
1634                DynOrdIndexedZSet<DynData, DynData>,
1635                DynData,
1636                DynData,
1637                DynZWeight,
1638            >(&test.factories(), &serialized);
1639
1640            assert_eq!(&*test, &deserialized);
1641        }
1642    }
1643
1644    #[test]
1645    fn test_serialize_indexed_wset_tup0_val() {
1646        let test1: OrdIndexedZSet<u64, ()> = indexed_zset! {};
1647        let test2 = indexed_zset! { 1 => { () => 1 } };
1648        let test3 = indexed_zset! { 1 => { () => 1 }, 2 => { () => 1 } };
1649
1650        for test in [test1, test2, test3] {
1651            let serialized = serialize_indexed_wset(&*test, &mut SerializerInner::new());
1652            let deserialized = deserialize_indexed_wset::<
1653                DynOrdIndexedZSet<DynData, DynData>,
1654                DynData,
1655                DynData,
1656                DynZWeight,
1657            >(&test.factories(), &serialized);
1658
1659            assert_eq!(&*test, &deserialized);
1660        }
1661    }
1662}