Skip to main content

dbsp/
trace.rs

1//! # Traces
2//!
3//! A "trace" describes how a collection of key-value pairs changes over time.
4//! A "batch" is a mostly immutable trace.  This module provides traits and
5//! structures for expressing traces in DBSP as collections of `(key, val, time,
6//! diff)` tuples.
7//!
8//! The base trait for a trace is [`BatchReader`], which allows a trace to be
9//! read in sorted order by key and value.  `BatchReader` provides [`Cursor`] to
10//! step through a batch's tuples without modifying them.
11//!
12//! The [`Batch`] trait extends [`BatchReader`] with types and methods for
13//! creating new traces from ordered tuples ([`Batch::Builder`]) or unordered
14//! tuples ([`Batch::Batcher`]), or by merging traces of like types.
15//!
16//! The [`Trace`] trait, which also extends [`BatchReader`], adds methods to
17//! append new batches.  New tuples must not have times earlier than any of the
18//! tuples already in the trace.
19//!
20//! # Time within traces
21//!
22//! See the [time](crate::time) module documentation for a description of
23//! logical times.
24//!
25//! Traces are sorted by key and value.  They are not sorted with respect to
26//! time: reading a trace might obtain out of order and duplicate times among
27//! the `(time, diff)` pairs associated with a key and value.
28
29use crate::circuit::metadata::OperatorMeta;
30use crate::dynamic::{ClonableTrait, DynDataTyped, DynUnit, Weight};
31use crate::storage::buffer_cache::CacheStats;
32pub use crate::storage::file::{Deserializable, Deserializer, Rkyv, Serializer};
33use crate::trace::cursor::{
34    DefaultPushCursor, FilteredMergeCursor, FilteredMergeCursorWithSnapshot, PushCursor,
35    UnfilteredMergeCursor,
36};
37use crate::utils::IsNone;
38use crate::{dynamic::ArchivedDBData, storage::buffer_cache::FBuf};
39use cursor::CursorFactory;
40use enum_map::Enum;
41use feldera_storage::{FileCommitter, FileReader, StoragePath};
42use rand::{Rng, thread_rng};
43use rkyv::ser::Serializer as _;
44use size_of::SizeOf;
45use std::any::TypeId;
46use std::sync::Arc;
47use std::{fmt::Debug, hash::Hash};
48
49pub mod cursor;
50pub mod filter;
51pub mod layers;
52pub mod ord;
53pub mod spine_async;
54pub use spine_async::{
55    BatchReaderWithSnapshot, ListMerger, MergerType, Spine, SpineSnapshot, WithSnapshot,
56};
57
58#[cfg(test)]
59pub mod test;
60
61pub use ord::{
62    FallbackIndexedWSet, FallbackIndexedWSetBuilder, FallbackIndexedWSetFactories,
63    FallbackKeyBatch, FallbackKeyBatchFactories, FallbackValBatch, FallbackValBatchFactories,
64    FallbackWSet, FallbackWSetBuilder, FallbackWSetFactories, FileIndexedWSet,
65    FileIndexedWSetFactories, FileKeyBatch, FileKeyBatchFactories, FileValBatch,
66    FileValBatchFactories, FileWSet, FileWSetFactories, OrdIndexedWSet, OrdIndexedWSetBuilder,
67    OrdIndexedWSetFactories, OrdKeyBatch, OrdKeyBatchFactories, OrdValBatch, OrdValBatchFactories,
68    OrdWSet, OrdWSetBuilder, OrdWSetFactories, VecIndexedWSet, VecIndexedWSetFactories,
69    VecKeyBatch, VecKeyBatchFactories, VecValBatch, VecValBatchFactories, VecWSet,
70    VecWSetFactories,
71};
72
73use rkyv::{Deserialize, archived_root};
74
75use crate::storage::tracking_bloom_filter::BloomFilterStats;
76use crate::{
77    Error, NumEntries, Timestamp,
78    algebra::MonoidValue,
79    dynamic::{DataTrait, DynPair, DynVec, DynWeightedPairs, Erase, Factory, WeightTrait},
80    storage::file::reader::Error as ReaderError,
81};
82pub use cursor::{Cursor, MergeCursor};
83pub use filter::{Filter, GroupFilter};
84pub use layers::Trie;
85
86/// Trait for data stored in batches.
87///
88/// This trait is used as a bound on `BatchReader::Key` and `BatchReader::Val`
89/// associated types (see [`trait BatchReader`]).  Hence when writing code that
90/// must be generic over any relational data, it is sufficient to impose
91/// `DBData` as a trait bound on types.  Conversely, a trait bound of the form
92/// `B: BatchReader` implies `B::Key: DBData` and `B::Val: DBData`.
93pub trait DBData:
94    Default
95    + Clone
96    + Eq
97    + Ord
98    + Hash
99    + SizeOf
100    + Send
101    + Sync
102    + Debug
103    + ArchivedDBData
104    + IsNone<Inner: ArchivedDBData>
105    + 'static
106{
107}
108
109/// Automatically implement DBData for everything that satisfied the bounds.
110impl<T> DBData for T where
111    T: Default
112        + Clone
113        + Eq
114        + Ord
115        + Hash
116        + SizeOf
117        + Send
118        + Sync
119        + Debug
120        + ArchivedDBData
121        + IsNone<Inner: ArchivedDBData>
122        + 'static
123{
124}
125
126/// A spine that is serialized to a file.
127#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive)]
128pub(crate) struct CommittedSpine {
129    pub batches: Vec<String>,
130    pub merged: Vec<(String, String)>,
131    pub effort: u64,
132    pub dirty: bool,
133}
134
135/// Trait for data that can be serialized and deserialized with [`rkyv`].
136///
137/// This trait doesn't have any extra bounds on Deserializable.
138///
139/// Deserializes `bytes` as type `T` using `rkyv`, tolerating `bytes` being
140/// misaligned.
141pub fn unaligned_deserialize<T: Deserializable>(bytes: &[u8]) -> T {
142    let mut aligned_bytes = FBuf::new();
143    aligned_bytes.extend_from_slice(bytes);
144    unsafe { archived_root::<T>(&aligned_bytes[..]) }
145        .deserialize(&mut Deserializer::default())
146        .unwrap()
147}
148
149/// Trait for data types used as weights.
150///
151/// A type used for weights in a batch (i.e., as `BatchReader::R`) must behave
152/// as a monoid, i.e., a set with an associative `+` operation and a neutral
153/// element (zero).
154///
155/// Some applications use a weight as a ring, that is, require it to support
156/// multiplication too.
157///
158/// Finally, some applications require it to have `<` and `>` operations, in
159/// particular to distinguish whether something is an insertion or deletion.
160///
161/// Signed integer types such as `i32` and `i64` are suitable as weights,
162/// although if there is overflow then the results will be wrong.
163///
164/// When writing code generic over any weight type, it is sufficient to impose
165/// `DBWeight` as a trait bound on types.  Conversely, a trait bound of the form
166/// `B: BatchReader` implies `B::R: DBWeight`.
167pub trait DBWeight: DBData + MonoidValue {}
168impl<T> DBWeight for T where T: DBData + MonoidValue {}
169
170pub trait BatchReaderFactories<
171    K: DataTrait + ?Sized,
172    V: DataTrait + ?Sized,
173    T,
174    R: WeightTrait + ?Sized,
175>: Clone + Send + Sync
176{
177    // type BatchItemVTable: BatchItemTypeDescr<Key = K, Val = V, Item = I, R = R>;
178    fn new<KType, VType, RType>() -> Self
179    where
180        KType: DBData + Erase<K>,
181        VType: DBData + Erase<V>,
182        RType: DBWeight + Erase<R>;
183
184    fn key_factory(&self) -> &'static dyn Factory<K>;
185    fn keys_factory(&self) -> &'static dyn Factory<DynVec<K>>;
186    fn val_factory(&self) -> &'static dyn Factory<V>;
187    fn weight_factory(&self) -> &'static dyn Factory<R>;
188}
189
190// TODO: use Tuple3 instead
191pub type WeightedItem<K, V, R> = DynPair<DynPair<K, V>, R>;
192
193pub trait BatchFactories<K: DataTrait + ?Sized, V: DataTrait + ?Sized, T, R: WeightTrait + ?Sized>:
194    BatchReaderFactories<K, V, T, R>
195{
196    fn item_factory(&self) -> &'static dyn Factory<DynPair<K, V>>;
197
198    fn weighted_items_factory(&self) -> &'static dyn Factory<DynWeightedPairs<DynPair<K, V>, R>>;
199    fn weighted_vals_factory(&self) -> &'static dyn Factory<DynWeightedPairs<V, R>>;
200    fn weighted_item_factory(&self) -> &'static dyn Factory<WeightedItem<K, V, R>>;
201
202    /// Factory for a vector of (T, R) or `None` if `T` is `()`.
203    fn time_diffs_factory(
204        &self,
205    ) -> Option<&'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>>;
206}
207
208/// A set of `(key, val, time, diff)` tuples that can be read and extended.
209///
210/// `Trace` extends [`BatchReader`], most notably with [`insert`][Self::insert]
211/// for adding new batches of tuples.
212///
213/// See [crate documentation](crate::trace) for more information on batches and
214/// traces.
215pub trait Trace: BatchReader {
216    /// The type of an immutable collection of updates.
217    type Batch: Batch<
218            Key = Self::Key,
219            Val = Self::Val,
220            Time = Self::Time,
221            R = Self::R,
222            Factories = Self::Factories,
223        >;
224
225    /// Allocates a new empty trace.
226    fn new(factories: &Self::Factories) -> Self;
227
228    /// Sets a compaction frontier, i.e., a timestamp such that timestamps
229    /// below the frontier are indistinguishable to DBSP, therefore any `ts`
230    /// in the trace can be safely replaced with `ts.join(frontier)` without
231    /// affecting the output of the circuit.  By applying this replacement,
232    /// updates to the same (key, value) pairs applied during different steps
233    /// can be merged or discarded.
234    ///
235    /// The compaction is performed lazily at merge time.
236    fn set_frontier(&mut self, frontier: &Self::Time);
237
238    /// Exert merge effort, even without updates.
239    fn exert(&mut self, effort: &mut isize);
240
241    /// Merge all updates in a trace into a single batch.
242    fn consolidate(self) -> Option<Self::Batch>;
243
244    /// Introduces a batch of updates to the trace.
245    fn insert(&mut self, batch: Self::Batch);
246
247    /// Introduces a batch of updates to the trace. More efficient that cloning
248    /// a batch and calling `insert`.
249    fn insert_arc(&mut self, batch: Arc<Self::Batch>);
250
251    /// Clears the value of the "dirty" flag to `false`.
252    ///
253    /// The "dirty" flag is used to efficiently track changes to the trace,
254    /// e.g., as part of checking whether a circuit has reached a fixed point.
255    /// Pushing a non-empty batch to the trace sets the flag to `true`. The
256    /// [`Self::dirty`] method returns true iff the trace has changed since the
257    /// last call to `clear_dirty_flag`.
258    fn clear_dirty_flag(&mut self);
259
260    /// Returns the value of the dirty flag.
261    fn dirty(&self) -> bool;
262
263    /// Informs the trace that keys that don't pass the filter are no longer
264    /// used and can be removed from the trace.
265    ///
266    /// The implementation is not required to remove truncated keys instantly
267    /// or at all.  This method is just a hint that keys that don't pass the
268    /// filter are no longer of interest to the consumer of the trace and
269    /// can be garbage collected.
270    ///
271    /// # Rationale
272    ///
273    /// This API is similar to the old API `BatchReader::truncate_keys_below`,
274    /// but in [Trace] instead of [BatchReader].  The difference is that a batch
275    /// can truncate its keys instanly by simply moving an internal pointer to
276    /// the first remaining key.  However, there is no similar way to retain
277    /// keys based on arbitrary predicates, this can only be done efficiently as
278    /// part of trace maintenance when either merging or compacting batches.
279    fn retain_keys(&mut self, filter: Filter<Self::Key>);
280
281    /// Informs the trace that values that don't pass the filter are no longer
282    /// used and can be removed from the trace.
283    ///
284    /// The implementation is not required to remove truncated values instantly
285    /// or at all.  This method is just a hint that values that don't pass the
286    /// filter are no longer of interest to the consumer of the trace and
287    /// can be garbage collected.
288    fn retain_values(&mut self, filter: GroupFilter<Self::Val>);
289
290    fn key_filter(&self) -> &Option<Filter<Self::Key>>;
291    fn value_filter(&self) -> &Option<GroupFilter<Self::Val>>;
292
293    /// Writes this trace to storage beneath `base`, using `pid` as a file name
294    /// prefix.  Adds the files that were written to `files` so that they can be
295    /// committed later.
296    fn save(
297        &mut self,
298        base: &StoragePath,
299        pid: &str,
300        files: &mut Vec<Arc<dyn FileCommitter>>,
301    ) -> Result<(), Error>;
302
303    /// Reads this trace back from storage under `base` with `pid` as the
304    /// prefix.
305    fn restore(&mut self, base: &StoragePath, pid: &str) -> Result<(), Error>;
306
307    /// Allows the trace to report additional metadata.
308    fn metadata(&self, _meta: &mut OperatorMeta) {}
309}
310
311/// Where a batch is stored.
312#[derive(Copy, Clone, Debug, PartialEq, Eq, Enum)]
313pub enum BatchLocation {
314    /// In RAM.
315    Memory,
316
317    /// On disk.
318    Storage,
319}
320
321// impl BatchLocation {
322//     fn as_str(&self) -> &'static str {
323//         match self {
324//             Self::Memory => "memory",
325//             Self::Storage => "storage",
326//         }
327//     }
328// }
329
330/// A set of `(key, value, time, diff)` tuples whose contents may be read in
331/// order by key and value.
332///
333/// A `BatchReader` is a mostly read-only interface.  This is especially useful
334/// for views derived from other sources in ways that prevent the construction
335/// of batches from the type of data in the view (for example, filtered views,
336/// or views with extended time coordinates).
337///
338/// See [crate documentation](crate::trace) for more information on batches and
339/// traces.
340///
341/// # Object safety
342///
343/// `BatchReader` is not object safe (it cannot be used as `dyn BatchReader`),
344/// but [Cursor] is, which can often be a useful substitute.
345pub trait BatchReader: Debug + NumEntries + Rkyv + SizeOf + 'static
346where
347    Self: Sized,
348{
349    type Factories: BatchFactories<Self::Key, Self::Val, Self::Time, Self::R>;
350
351    /// Key by which updates are indexed.
352    type Key: DataTrait + ?Sized;
353
354    /// Values associated with keys.
355    type Val: DataTrait + ?Sized;
356
357    /// Timestamps associated with updates
358    type Time: Timestamp;
359
360    /// Associated update.
361    type R: WeightTrait + ?Sized;
362
363    /// The type used to enumerate the batch's contents.
364    type Cursor<'s>: Cursor<Self::Key, Self::Val, Self::Time, Self::R> + Clone + Send
365    where
366        Self: 's;
367
368    // type Consumer: Consumer<Self::Key, Self::Val, Self::R, Self::Time>;
369
370    fn factories(&self) -> Self::Factories;
371
372    /// Acquires a cursor to the batch's contents.
373    fn cursor(&self) -> Self::Cursor<'_>;
374
375    /// Acquires a [PushCursor] for the batch's contents.
376    fn push_cursor(
377        &self,
378    ) -> Box<dyn PushCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
379        Box::new(DefaultPushCursor::new(self.cursor()))
380    }
381
382    /// Acquires a [MergeCursor] for the batch's contents.
383    fn merge_cursor(
384        &self,
385        key_filter: Option<Filter<Self::Key>>,
386        value_filter: Option<GroupFilter<Self::Val>>,
387    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
388        if key_filter.is_none() && value_filter.is_none() {
389            Box::new(UnfilteredMergeCursor::new(self.cursor()))
390        } else if let Some(GroupFilter::Simple(filter)) = value_filter {
391            Box::new(FilteredMergeCursor::new(
392                self.cursor(),
393                key_filter,
394                Some(filter),
395            ))
396        } else {
397            // Other forms of GroupFilters cannot be evaluated without a trace snapshot -- don't filter values
398            // in such cursors.
399            Box::new(FilteredMergeCursor::new(self.cursor(), key_filter, None))
400        }
401    }
402
403    /// Similar to `merge_cursor`, but invoked in the context of a spine merger.
404    /// Takes the current spine snapshot as an extra argument and uses it to evaluate `value_filter` precisely.
405    fn merge_cursor_with_snapshot<'a, S>(
406        &'a self,
407        key_filter: Option<Filter<Self::Key>>,
408        value_filter: Option<GroupFilter<Self::Val>>,
409        snapshot: &'a Option<Arc<S>>,
410    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + 'a>
411    where
412        S: BatchReader<Key = Self::Key, Val = Self::Val, Time = Self::Time, R = Self::R>,
413    {
414        let Some(snapshot) = snapshot else {
415            return self.merge_cursor(key_filter, value_filter);
416        };
417        if key_filter.is_none() && value_filter.is_none() {
418            Box::new(UnfilteredMergeCursor::new(self.cursor()))
419        } else if value_filter.is_none() {
420            Box::new(FilteredMergeCursor::new(self.cursor(), key_filter, None))
421        } else if let Some(GroupFilter::Simple(filter)) = value_filter {
422            Box::new(FilteredMergeCursor::new(
423                self.cursor(),
424                key_filter,
425                Some(filter),
426            ))
427        } else {
428            Box::new(FilteredMergeCursorWithSnapshot::new(
429                self.cursor(),
430                key_filter,
431                value_filter.unwrap(),
432                snapshot,
433            ))
434        }
435    }
436
437    /// Acquires a merge cursor for the batch's contents.
438    fn consuming_cursor(
439        &mut self,
440        key_filter: Option<Filter<Self::Key>>,
441        value_filter: Option<GroupFilter<Self::Val>>,
442    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
443        self.merge_cursor(key_filter, value_filter)
444    }
445    //fn consumer(self) -> Self::Consumer;
446
447    /// The number of keys in the batch.
448    // TODO: return `(usize, Option<usize>)`, similar to
449    // `Iterator::size_hint`, since not all implementations
450    // can compute the number of keys precisely.  Same for
451    // `len()`.
452    fn key_count(&self) -> usize;
453
454    /// The number of updates in the batch.
455    fn len(&self) -> usize;
456
457    /// The memory or storage size of the batch in bytes.
458    ///
459    /// This can be an approximation, such as the size of an on-disk file for a
460    /// stored batch.
461    ///
462    /// Implementations of this function can be expensive because they might
463    /// require iterating through all the data in a batch.  Currently this is
464    /// only used to decide whether to keep the result of a merge in memory or
465    /// on storage.  For this case, the merge will visit and copy all the data
466    /// in the batch. The batch will be discarded afterward, which means that
467    /// the implementation need not attempt to cache the return value.
468    fn approximate_byte_size(&self) -> usize;
469
470    /// Statistics of the Bloom filter used by [Cursor::seek_key_exact].
471    /// The Bloom filter (kept in memory) is used there to quickly check
472    /// whether a key might be present in the batch, before doing a
473    /// binary tree lookup within the batch to be exactly sure.
474    /// The statistics include for example the size in bytes and the hit rate.
475    /// Only some kinds of batches use a filter; others should return
476    /// `BloomFilterStats::default()`.
477    fn filter_stats(&self) -> BloomFilterStats;
478
479    /// Where the batch's data is stored.
480    fn location(&self) -> BatchLocation {
481        BatchLocation::Memory
482    }
483
484    /// Storage cache access statistics for this batch only.
485    ///
486    /// Most batches are in-memory, so they don't have any statistics.
487    fn cache_stats(&self) -> CacheStats {
488        CacheStats::default()
489    }
490
491    /// True if the batch is empty.
492    fn is_empty(&self) -> bool {
493        self.len() == 0
494    }
495
496    /// A method that returns either true (possibly in the batch) or false
497    /// (definitely not in the batch).
498    fn maybe_contains_key(&self, _hash: u64) -> bool {
499        true
500    }
501
502    /// Returns a uniform random sample of distincts keys from the batch.
503    ///
504    /// Does not take into account the number values associated with each
505    /// key and their weights, i.e., a key that has few values is as likely
506    /// to appear in the output sample as a key with many values.
507    ///
508    /// # Arguments
509    ///
510    /// * `rng` - random number generator used to generate the sample.
511    ///
512    /// * `sample_size` - requested sample size.
513    ///
514    /// * `sample` - output
515    ///
516    /// # Invariants
517    ///
518    /// The actual sample computed by the method can be smaller than
519    /// `sample_size` even if `self` contains `>sample_size` keys.
520    ///
521    /// A correct implementation must enforce the following invariants:
522    ///
523    /// * The output sample size cannot exceed `sample_size`.
524    ///
525    /// * The output sample can only contain keys present in `self` (with
526    ///   non-zero weights).
527    ///
528    /// * If `sample_size` is greater than or equal to the number of keys
529    ///   present in `self` (with non-zero weights), the resulting sample must
530    ///   contain all such keys.
531    ///
532    /// * The output sample contains keys sorted in ascending order.
533    fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
534    where
535        Self::Time: PartialEq<()>,
536        RG: Rng;
537
538    /// Returns num_partitions-1 keys from the batch that partition the batch into num_partitions
539    /// approximately equal size ranges 0..key1, key1..key2, ... , key_num_partitions-1..last_key_in_the_batch.
540    ///
541    /// The default implementation uses the sample_keys method to sample num_partitions^2 keys and
542    /// picks keys num_partitions, 2*num_partitions, ..,num_partitions-1*num_partitions as boundaries.
543    ///
544    /// # Arguments
545    ///
546    /// * `num_partitions` - number of partitions to create.
547    /// * `bounds` - output vector to store the partition boundaries.
548    fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<Self::Key>)
549    where
550        Self::Time: PartialEq<()>,
551    {
552        bounds.clear();
553        if num_partitions <= 1 {
554            return;
555        }
556
557        let sample_size = num_partitions * num_partitions;
558
559        let mut sample = self.factories().keys_factory().default_box();
560        self.sample_keys(&mut thread_rng(), sample_size, sample.as_mut());
561
562        // Pick evenly distributed keys as boundaries
563        let sample_len = sample.len();
564        if sample_len == 0 {
565            return;
566        }
567
568        if sample_len >= num_partitions {
569            // Pick num_bounds evenly distributed indices from the sample
570            // These divide the sample into num_bounds + 1 roughly equal parts
571            for i in 0..num_partitions - 1 {
572                let idx = ((i + 1) * sample_len) / num_partitions;
573                let idx = idx.min(sample_len - 1);
574                bounds.push_ref(sample.index(idx));
575            }
576        } else {
577            // If we have fewer samples than needed, use what we have
578            for i in 0..sample_len {
579                bounds.push_ref(sample.index(i));
580            }
581        }
582    }
583
584    /// Creates and returns a new batch that is a subset of this one, containing
585    /// only the key-value pairs whose keys are in `keys`. May also return
586    /// `None`, the default implementation, if the batch doesn't want to
587    /// implement this method.  In particular, a batch for which access through
588    /// a cursor is fast should return `None` to avoid the expense of copying
589    /// data.
590    ///
591    /// # Rationale
592    ///
593    /// This method enables performance optimizations for the case where these
594    /// assumptions hold:
595    ///
596    /// 1. Individual [Batch]es flowing through a circuit are small enough to
597    ///    fit comfortably in memory.
598    ///
599    /// 2. [Trace]s accumulated over time as a circuit executes may become large
600    ///    enough that they must be maintained in external storage.
601    ///
602    /// If an operator needs to fetch all of the data from a `trace` that
603    /// corresponds to some set of `keys`, then, given these assumptions, doing
604    /// so one key at a time with a cursor will be slow because every key fetch
605    /// potentially incurs a round trip to the storage, with total latency O(n)
606    /// in the number of keys. This method gives the batch implementation the
607    /// opportunity to implement parallel fetch for `trace.fetch(key)`, with
608    /// total latency O(1) in the number of keys.
609    #[allow(async_fn_in_trait)]
610    async fn fetch<B>(
611        &self,
612        keys: &B,
613    ) -> Option<Box<dyn CursorFactory<Self::Key, Self::Val, Self::Time, Self::R>>>
614    where
615        B: BatchReader<Key = Self::Key, Time = ()>,
616    {
617        let _ = keys;
618        None
619    }
620
621    fn keys(&self) -> Option<&DynVec<Self::Key>> {
622        None
623    }
624}
625
626impl<B> BatchReader for Arc<B>
627where
628    B: BatchReader,
629{
630    type Factories = B::Factories;
631    type Key = B::Key;
632    type Val = B::Val;
633    type Time = B::Time;
634    type R = B::R;
635    type Cursor<'s> = B::Cursor<'s>;
636    fn factories(&self) -> Self::Factories {
637        (**self).factories()
638    }
639    fn cursor(&self) -> Self::Cursor<'_> {
640        (**self).cursor()
641    }
642    fn merge_cursor(
643        &self,
644        key_filter: Option<Filter<Self::Key>>,
645        value_filter: Option<GroupFilter<Self::Val>>,
646    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
647        (**self).merge_cursor(key_filter, value_filter)
648    }
649    fn key_count(&self) -> usize {
650        (**self).key_count()
651    }
652    fn len(&self) -> usize {
653        (**self).len()
654    }
655    fn approximate_byte_size(&self) -> usize {
656        (**self).approximate_byte_size()
657    }
658    fn filter_stats(&self) -> BloomFilterStats {
659        (**self).filter_stats()
660    }
661    fn location(&self) -> BatchLocation {
662        (**self).location()
663    }
664    fn cache_stats(&self) -> CacheStats {
665        (**self).cache_stats()
666    }
667    fn is_empty(&self) -> bool {
668        (**self).is_empty()
669    }
670    fn maybe_contains_key(&self, hash: u64) -> bool {
671        (**self).maybe_contains_key(hash)
672    }
673    fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
674    where
675        Self::Time: PartialEq<()>,
676        RG: Rng,
677    {
678        (**self).sample_keys(rng, sample_size, sample)
679    }
680    fn consuming_cursor(
681        &mut self,
682        key_filter: Option<Filter<Self::Key>>,
683        value_filter: Option<GroupFilter<Self::Val>>,
684    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
685        (**self).merge_cursor(key_filter, value_filter)
686    }
687    async fn fetch<KB>(
688        &self,
689        keys: &KB,
690    ) -> Option<Box<dyn CursorFactory<Self::Key, Self::Val, Self::Time, Self::R>>>
691    where
692        KB: BatchReader<Key = Self::Key, Time = ()>,
693    {
694        (**self).fetch(keys).await
695    }
696    fn keys(&self) -> Option<&DynVec<Self::Key>> {
697        (**self).keys()
698    }
699}
700
701/// A [`BatchReader`] plus features for constructing new batches.
702///
703/// [`Batch`] extends [`BatchReader`] with types for constructing new batches
704/// from ordered tuples ([`Self::Builder`]) or unordered tuples
705/// ([`Self::Batcher`]), or by merging traces of like types, plus some
706/// convenient methods for using those types.
707///
708/// See [crate documentation](crate::trace) for more information on batches and
709/// traces.
710pub trait Batch: BatchReader + Clone + Send + Sync
711where
712    Self: Sized,
713{
714    /// A batch type equivalent to `Self`, but with timestamp type `T` instead of `Self::Time`.
715    type Timed<T: Timestamp>: Batch<
716            Key = <Self as BatchReader>::Key,
717            Val = <Self as BatchReader>::Val,
718            Time = T,
719            R = <Self as BatchReader>::R,
720        >;
721
722    /// A type used to assemble batches from disordered updates.
723    type Batcher: Batcher<Self>;
724
725    /// A type used to assemble batches from ordered update sequences.
726    type Builder: Builder<Self>;
727
728    /// Assemble an unordered vector of weighted items into a batch.
729    #[allow(clippy::type_complexity)]
730    fn dyn_from_tuples(
731        factories: &Self::Factories,
732        time: Self::Time,
733        tuples: &mut Box<DynWeightedPairs<DynPair<Self::Key, Self::Val>, Self::R>>,
734    ) -> Self {
735        let mut batcher = Self::Batcher::new_batcher(factories, time);
736        batcher.push_batch(tuples);
737        batcher.seal()
738    }
739
740    /// Creates a new batch as a copy of `batch`, using `timestamp` for all of
741    /// the new batch's timestamps This is useful for adding a timestamp to a
742    /// batch, or for converting between different batch implementations
743    /// (e.g. writing an in-memory batch to disk).
744    ///
745    /// TODO: for adding a timestamp to a batch, this could be implemented more
746    /// efficiently by having a special batch type where all updates have the same
747    /// timestamp, as this is the only kind of batch that we ever create directly in
748    /// DBSP; batches with multiple timestamps are only created as a result of
749    /// merging.  The main complication is that we will need to extend the trace
750    /// implementation to work with batches of multiple types.  This shouldn't be
751    /// too hard and is on the todo list.
752    fn from_batch<BI>(batch: &BI, timestamp: &Self::Time, factories: &Self::Factories) -> Self
753    where
754        BI: BatchReader<Key = Self::Key, Val = Self::Val, Time = (), R = Self::R>,
755    {
756        // Source and destination types are usually the same in the top-level scope.
757        // Optimize for this case by simply cloning the source batch. If the batch is
758        // implemented as `Arc` internally, this is essentially zero cost.
759        if TypeId::of::<BI>() == TypeId::of::<Self>() {
760            unsafe { std::mem::transmute::<&BI, &Self>(batch).clone() }
761        } else {
762            Self::from_cursor(
763                batch.cursor(),
764                timestamp,
765                factories,
766                batch.key_count(),
767                batch.len(),
768            )
769        }
770    }
771
772    /// Like `from_batch`, but avoids cloning the batch if the output type is identical to the input type.
773    fn from_arc_batch<BI>(
774        batch: &Arc<BI>,
775        timestamp: &Self::Time,
776        factories: &Self::Factories,
777    ) -> Arc<Self>
778    where
779        BI: BatchReader<Key = Self::Key, Val = Self::Val, Time = (), R = Self::R>,
780    {
781        // Source and destination types are usually the same in the top-level scope.
782        // Optimize for this case by simply cloning the source batch. If the batch is
783        // implemented as `Arc` internally, this is essentially zero cost.
784        if TypeId::of::<BI>() == TypeId::of::<Self>() {
785            unsafe { std::mem::transmute::<&Arc<BI>, &Arc<Self>>(batch).clone() }
786        } else {
787            Arc::new(Self::from_cursor(
788                batch.cursor(),
789                timestamp,
790                factories,
791                batch.key_count(),
792                batch.len(),
793            ))
794        }
795    }
796
797    /// Creates a new batch as a copy of the tuples accessible via `cursor``,
798    /// using `timestamp` for all of the new batch's timestamps.
799    fn from_cursor<C>(
800        mut cursor: C,
801        timestamp: &Self::Time,
802        factories: &Self::Factories,
803        key_capacity: usize,
804        value_capacity: usize,
805    ) -> Self
806    where
807        C: Cursor<Self::Key, Self::Val, (), Self::R>,
808    {
809        let mut builder = Self::Builder::with_capacity(factories, key_capacity, value_capacity);
810        while cursor.key_valid() {
811            let mut any_values = false;
812            while cursor.val_valid() {
813                let weight = cursor.weight();
814                debug_assert!(!weight.is_zero());
815                builder.push_time_diff(timestamp, weight);
816                builder.push_val(cursor.val());
817                any_values = true;
818                cursor.step_val();
819            }
820            if any_values {
821                builder.push_key(cursor.key());
822            }
823            cursor.step_key();
824        }
825        builder.done()
826    }
827
828    /// Creates an empty batch.
829    fn dyn_empty(factories: &Self::Factories) -> Self {
830        Self::Builder::new_builder(factories).done()
831    }
832
833    /// Returns elements from `self` that satisfy a predicate.
834    fn filter(&self, predicate: &dyn Fn(&Self::Key, &Self::Val) -> bool) -> Self
835    where
836        Self::Time: PartialEq<()> + From<()>,
837    {
838        let factories = self.factories();
839        let mut builder = Self::Builder::new_builder(&factories);
840        let mut cursor = self.cursor();
841
842        while cursor.key_valid() {
843            let mut any_values = false;
844            while cursor.val_valid() {
845                if predicate(cursor.key(), cursor.val()) {
846                    builder.push_diff(cursor.weight());
847                    builder.push_val(cursor.val());
848                    any_values = true;
849                }
850                cursor.step_val();
851            }
852            if any_values {
853                builder.push_key(cursor.key());
854            }
855            cursor.step_key();
856        }
857
858        builder.done()
859    }
860
861    /// If this batch is not on storage, but supports writing itself to storage,
862    /// this method writes it to storage and returns the stored version.
863    fn persisted(&self) -> Option<Self> {
864        None
865    }
866
867    /// This functions returns the file that can be used to restore the batch's
868    /// contents.
869    ///
870    /// If the batch can not be persisted, this function returns None.
871    fn file_reader(&self) -> Option<Arc<dyn FileReader>> {
872        None
873    }
874
875    fn from_path(_factories: &Self::Factories, _path: &StoragePath) -> Result<Self, ReaderError> {
876        Err(ReaderError::Unsupported)
877    }
878}
879
880/// Functionality for collecting and batching updates.
881pub trait Batcher<Output>: SizeOf
882where
883    Output: Batch,
884{
885    /// Allocates a new empty batcher.  All tuples in the batcher (and its
886    /// output batch) will have timestamp `time`.
887    fn new_batcher(vtables: &Output::Factories, time: Output::Time) -> Self;
888
889    /// Adds an unordered batch of elements to the batcher.
890    fn push_batch(
891        &mut self,
892        batch: &mut Box<DynWeightedPairs<DynPair<Output::Key, Output::Val>, Output::R>>,
893    );
894
895    /// Adds a consolidated batch of elements to the batcher.
896    ///
897    /// A consolidated batch is sorted and contains no duplicates or zero
898    /// weights.
899    fn push_consolidated_batch(
900        &mut self,
901        batch: &mut Box<DynWeightedPairs<DynPair<Output::Key, Output::Val>, Output::R>>,
902    );
903
904    /// Returns the number of tuples in the batcher.
905    fn tuples(&self) -> usize;
906
907    /// Returns all updates not greater or equal to an element of `upper`.
908    fn seal(self) -> Output;
909}
910
911/// Functionality for building batches from ordered update sequences.
912///
913/// This interface requires the client to push all of the time-diff pairs
914/// associated with a value, then the value, then all the time-diff pairs
915/// associated with the next value, then that value, and so on. Once all of the
916/// values associated with the current key have been pushed, the client pushes
917/// the key.
918///
919/// If this interface is too low-level for the client, consider wrapping it in a
920/// [TupleBuilder].
921///
922/// # Example
923///
924/// To push the following tuples:
925///
926/// ```text
927/// (k1, v1, t1, r1)
928/// (k1, v1, t2, r2)
929/// (k1, v2, t1, r1)
930/// (k1, v3, t2, r2)
931/// (k2, v1, t1, r1)
932/// (k2, v1, t2, r2)
933/// (k3, v1, t1, r1)
934/// (k4, v2, t2, r2)
935/// ```
936///
937/// the client would use:
938///
939/// ```ignore
940/// builder.push_time_diff(t1, r1);
941/// builder.push_time_diff(t2, r2);
942/// builder.push_val(v1);
943/// builder.push_time_diff(t1, r1);
944/// builder.push_val(v2);
945/// builder.push_time_diff(t2, r2);
946/// builder.push_val(v3);
947/// builder.push_key(k1);
948/// builder.push_time_diff(t1, r1);
949/// builder.push_time_diff(t2, r2);
950/// builder.push_val(v1);
951/// builder.push_key(k2);
952/// builder.push_time_diff(t1, r1);
953/// builder.push_val(v1);
954/// builder.push_key(k3);
955/// builder.push_time_diff(t2, r2);
956/// builder.push_val(v2);
957/// builder.push_key(k4);
958/// ```
959pub trait Builder<Output>: SizeOf
960where
961    Self: Sized,
962    Output: Batch,
963{
964    /// Creates a new builder with an initial capacity of 0.
965    fn new_builder(factories: &Output::Factories) -> Self {
966        Self::with_capacity(factories, 0, 0)
967    }
968
969    /// Creates an empty builder with estimated capacities for keys and
970    /// key-value pairs.  Only `tuple_capacity >= key_capacity` makes sense but
971    /// implementations must tolerate contradictory capacity requests.
972    fn with_capacity(
973        factories: &Output::Factories,
974        key_capacity: usize,
975        value_capacity: usize,
976    ) -> Self;
977
978    /// Creates an empty builder to hold the result of merging
979    /// `batches`. Optionally, `location` can specify the preferred location for
980    /// the result of the merge.
981    fn for_merge<'a, B, I>(
982        factories: &Output::Factories,
983        batches: I,
984        location: Option<BatchLocation>,
985    ) -> Self
986    where
987        B: BatchReader,
988        I: IntoIterator<Item = &'a B> + Clone,
989    {
990        let _ = location;
991        let key_capacity = batches.clone().into_iter().map(|b| b.key_count()).sum();
992        let value_capacity = batches.into_iter().map(|b| b.len()).sum();
993        Self::with_capacity(factories, key_capacity, value_capacity)
994    }
995
996    /// Adds time-diff pair `(time, weight)`.
997    fn push_time_diff(&mut self, time: &Output::Time, weight: &Output::R);
998
999    /// Adds time-diff pair `(time, weight)`.
1000    fn push_time_diff_mut(&mut self, time: &mut Output::Time, weight: &mut Output::R) {
1001        self.push_time_diff(time, weight);
1002    }
1003
1004    /// Adds value `val`.
1005    fn push_val(&mut self, val: &Output::Val);
1006
1007    /// Adds value `val`.
1008    fn push_val_mut(&mut self, val: &mut Output::Val) {
1009        self.push_val(val);
1010    }
1011
1012    /// Adds key `key`.
1013    fn push_key(&mut self, key: &Output::Key);
1014
1015    /// Adds key `key`.
1016    fn push_key_mut(&mut self, key: &mut Output::Key) {
1017        self.push_key(key);
1018    }
1019
1020    /// Adds time-diff pair `(), weight`.
1021    fn push_diff(&mut self, weight: &Output::R)
1022    where
1023        Output::Time: PartialEq<()>,
1024    {
1025        self.push_time_diff(&Output::Time::default(), weight);
1026    }
1027
1028    /// Adds time-diff pair `(), weight`.
1029    fn push_diff_mut(&mut self, weight: &mut Output::R)
1030    where
1031        Output::Time: PartialEq<()>,
1032    {
1033        self.push_diff(weight);
1034    }
1035
1036    /// Adds time-diff pair `(), weight` and value `val`.
1037    fn push_val_diff(&mut self, val: &Output::Val, weight: &Output::R)
1038    where
1039        Output::Time: PartialEq<()>,
1040    {
1041        self.push_time_diff(&Output::Time::default(), weight);
1042        self.push_val(val);
1043    }
1044
1045    /// Adds time-diff pair `(), weight` and value `val`.
1046    fn push_val_diff_mut(&mut self, val: &mut Output::Val, weight: &mut Output::R)
1047    where
1048        Output::Time: PartialEq<()>,
1049    {
1050        self.push_val_diff(val, weight);
1051    }
1052
1053    /// Allocates room for `additional` keys.
1054    fn reserve(&mut self, additional: usize) {
1055        let _ = additional;
1056    }
1057
1058    fn num_keys(&self) -> usize;
1059    fn num_tuples(&self) -> usize;
1060
1061    /// Completes building and returns the batch.
1062    fn done(self) -> Output;
1063}
1064
1065/// Batch builder that accepts a full tuple at a time.
1066///
1067/// This wrapper for [Builder] allows a full tuple to be added at a time.
1068pub struct TupleBuilder<B, Output>
1069where
1070    B: Builder<Output>,
1071    Output: Batch,
1072{
1073    builder: B,
1074    kv: Box<DynPair<Output::Key, Output::Val>>,
1075    has_kv: bool,
1076}
1077
1078impl<B, Output> TupleBuilder<B, Output>
1079where
1080    B: Builder<Output>,
1081    Output: Batch,
1082{
1083    pub fn new(factories: &Output::Factories, builder: B) -> Self {
1084        Self {
1085            builder,
1086            kv: factories.item_factory().default_box(),
1087            has_kv: false,
1088        }
1089    }
1090
1091    pub fn num_keys(&self) -> usize {
1092        self.builder.num_keys()
1093    }
1094
1095    pub fn num_tuples(&self) -> usize {
1096        self.builder.num_tuples()
1097    }
1098
1099    /// Adds `element` to the batch.
1100    pub fn push(&mut self, element: &mut DynPair<DynPair<Output::Key, Output::Val>, Output::R>)
1101    where
1102        Output::Time: PartialEq<()>,
1103    {
1104        let (kv, w) = element.split_mut();
1105        let (k, v) = kv.split_mut();
1106        self.push_vals(k, v, &mut Output::Time::default(), w);
1107    }
1108
1109    /// Adds tuple `(key, val, time, weight)` to the batch.
1110    pub fn push_refs(
1111        &mut self,
1112        key: &Output::Key,
1113        val: &Output::Val,
1114        time: &Output::Time,
1115        weight: &Output::R,
1116    ) {
1117        if self.has_kv {
1118            let (k, v) = self.kv.split_mut();
1119            if k != key {
1120                self.builder.push_val_mut(v);
1121                self.builder.push_key_mut(k);
1122                self.kv.from_refs(key, val);
1123            } else if v != val {
1124                self.builder.push_val_mut(v);
1125                val.clone_to(v);
1126            }
1127        } else {
1128            self.has_kv = true;
1129            self.kv.from_refs(key, val);
1130        }
1131        self.builder.push_time_diff(time, weight);
1132    }
1133
1134    /// Adds tuple `(key, val, time, weight)` to the batch.
1135    pub fn push_vals(
1136        &mut self,
1137        key: &mut Output::Key,
1138        val: &mut Output::Val,
1139        time: &mut Output::Time,
1140        weight: &mut Output::R,
1141    ) {
1142        if self.has_kv {
1143            let (k, v) = self.kv.split_mut();
1144            if k != key {
1145                self.builder.push_val_mut(v);
1146                self.builder.push_key_mut(k);
1147                self.kv.from_vals(key, val);
1148            } else if v != val {
1149                self.builder.push_val_mut(v);
1150                val.move_to(v);
1151            }
1152        } else {
1153            self.has_kv = true;
1154            self.kv.from_vals(key, val);
1155        }
1156        self.builder.push_time_diff_mut(time, weight);
1157    }
1158
1159    pub fn reserve(&mut self, additional: usize) {
1160        self.builder.reserve(additional)
1161    }
1162
1163    /// Adds all of the tuples in `iter` to the batch.
1164    pub fn extend<'a, I>(&mut self, iter: I)
1165    where
1166        Output::Time: PartialEq<()>,
1167        I: Iterator<Item = &'a mut WeightedItem<Output::Key, Output::Val, Output::R>>,
1168    {
1169        let (lower, upper) = iter.size_hint();
1170        self.reserve(upper.unwrap_or(lower));
1171
1172        for item in iter {
1173            let (kv, w) = item.split_mut();
1174            let (k, v) = kv.split_mut();
1175
1176            self.push_vals(k, v, &mut Output::Time::default(), w);
1177        }
1178    }
1179
1180    /// Completes building and returns the batch.
1181    pub fn done(mut self) -> Output {
1182        if self.has_kv {
1183            let (k, v) = self.kv.split_mut();
1184            self.builder.push_val_mut(v);
1185            self.builder.push_key_mut(k);
1186        }
1187        self.builder.done()
1188    }
1189}
1190
1191/// Merges all of the batches in `batches`, applying `key_filter` and
1192/// `value_filter`, and returns the merged result.
1193///
1194/// The filters won't be applied to batches that don't get merged at all, that
1195/// is, if `batches` contains only one non-empty batch, or if it contains two
1196/// small batches that merge to become an empty batch alongside a third larger
1197/// batch, etc.
1198pub fn merge_batches<B, T>(
1199    factories: &B::Factories,
1200    batches: T,
1201    key_filter: &Option<Filter<B::Key>>,
1202    value_filter: &Option<GroupFilter<B::Val>>,
1203) -> B
1204where
1205    T: IntoIterator<Item = B>,
1206    B: Batch,
1207{
1208    // Collect input batches, discarding empty batches.
1209    let mut batches = batches
1210        .into_iter()
1211        .filter(|b| !b.is_empty())
1212        .collect::<Vec<_>>();
1213
1214    // Merge groups of up to 64 input batches to one output batch each.
1215    //
1216    // In practice, there are <= 64 input batches and 1 output batch (or 0 if
1217    // the inputs cancel each other out).
1218    while batches.len() > 1 {
1219        let mut inputs = batches.split_off(batches.len().saturating_sub(64));
1220        let result: B = ListMerger::merge(
1221            factories,
1222            B::Builder::for_merge(factories, &inputs, Some(BatchLocation::Memory)),
1223            inputs
1224                .iter_mut()
1225                .map(|b| b.consuming_cursor(key_filter.clone(), value_filter.clone()))
1226                .collect(),
1227        );
1228        if !result.is_empty() {
1229            batches.push(result);
1230        }
1231    }
1232
1233    // Take the final output batch, or synthesize an empty one if all the
1234    // batches added up to nothing.
1235    batches.pop().unwrap_or_else(|| B::dyn_empty(factories))
1236}
1237
1238/// Merges all of the batches in `batches`, applying `key_filter` and
1239/// `value_filter`, and returns the merged result.
1240///
1241/// Every tuple will be passed through the filters.
1242pub fn merge_batches_by_reference<'a, B, T>(
1243    factories: &B::Factories,
1244    batches: T,
1245    key_filter: &Option<Filter<B::Key>>,
1246    value_filter: &Option<GroupFilter<B::Val>>,
1247) -> B
1248where
1249    T: IntoIterator<Item = &'a B>,
1250    B: Batch,
1251{
1252    // Collect input batches, discarding empty batches.
1253    let mut batches = batches
1254        .into_iter()
1255        .filter(|b| !b.is_empty())
1256        .collect::<Vec<_>>();
1257
1258    // Merge groups of up to 64 input batches to one output batch each. This
1259    // also transforms `&B` in `batches` into `B` in `outputs`.
1260    //
1261    // In practice, there are <= 64 input batches and 1 output batch (or 0 if
1262    // the inputs cancel each other out).
1263    let mut outputs = Vec::with_capacity(batches.len().div_ceil(64));
1264    while !batches.is_empty() {
1265        let inputs = batches.split_off(batches.len().saturating_sub(64));
1266        let result: B = ListMerger::merge(
1267            factories,
1268            B::Builder::for_merge(
1269                factories,
1270                inputs.iter().cloned(),
1271                Some(BatchLocation::Memory),
1272            ),
1273            inputs
1274                .into_iter()
1275                .map(|b| b.merge_cursor(key_filter.clone(), value_filter.clone()))
1276                .collect(),
1277        );
1278        if !result.is_empty() {
1279            outputs.push(result);
1280        }
1281    }
1282
1283    // Merge the output batches (in practice, either 0 or 1 of them).
1284    merge_batches(factories, outputs, key_filter, value_filter)
1285}
1286
1287/// Compares two batches for equality.  This works regardless of whether the
1288/// batches are the same type, as long as their key, value, and weight types can
1289/// be compared for equality.
1290///
1291/// This can't be implemented as `PartialEq` because that is specialized for
1292/// comparing particular batch types (often in faster ways than this generic
1293/// function).  This function is mainly useful for testing in any case.
1294pub fn eq_batch<A, B, KA, VA, RA, KB, VB, RB>(a: &A, b: &B) -> bool
1295where
1296    A: BatchReader<Key = KA, Val = VA, Time = (), R = RA>,
1297    B: BatchReader<Key = KB, Val = VB, Time = (), R = RB>,
1298    KA: PartialEq<KB> + ?Sized,
1299    VA: PartialEq<VB> + ?Sized,
1300    RA: PartialEq<RB> + ?Sized,
1301    KB: ?Sized,
1302    VB: ?Sized,
1303    RB: ?Sized,
1304{
1305    let mut c1 = a.cursor();
1306    let mut c2 = b.cursor();
1307    while c1.key_valid() && c2.key_valid() {
1308        if c1.key() != c2.key() {
1309            return false;
1310        }
1311        while c1.val_valid() && c2.val_valid() {
1312            if c1.val() != c2.val() || c1.weight() != c2.weight() {
1313                return false;
1314            }
1315            c1.step_val();
1316            c2.step_val();
1317        }
1318        if c1.val_valid() || c2.val_valid() {
1319            return false;
1320        }
1321        c1.step_key();
1322        c2.step_key();
1323    }
1324    !c1.key_valid() && !c2.key_valid()
1325}
1326
1327fn serialize_wset<B, K, R>(batch: &B) -> Vec<u8>
1328where
1329    B: BatchReader<Key = K, Val = DynUnit, Time = (), R = R>,
1330    K: DataTrait + ?Sized,
1331    R: WeightTrait + ?Sized,
1332{
1333    let mut s = Serializer::default();
1334    let mut offsets = Vec::with_capacity(2 * batch.len());
1335    let mut cursor = batch.cursor();
1336    while cursor.key_valid() {
1337        offsets.push(cursor.key().serialize(&mut s).unwrap());
1338        offsets.push(cursor.weight().serialize(&mut s).unwrap());
1339        cursor.step_key();
1340    }
1341    let _offset = s.serialize_value(&offsets).unwrap();
1342    s.into_serializer().into_inner().into_vec()
1343}
1344
1345fn deserialize_wset<B, K, R>(factories: &B::Factories, data: &[u8]) -> B
1346where
1347    B: Batch<Key = K, Val = DynUnit, Time = (), R = R>,
1348    K: DataTrait + ?Sized,
1349    R: WeightTrait + ?Sized,
1350{
1351    let offsets = unsafe { archived_root::<Vec<usize>>(data) };
1352    assert!(offsets.len() % 2 == 0);
1353    let n = offsets.len() / 2;
1354    let mut builder = B::Builder::with_capacity(factories, n, n);
1355    let mut key = factories.key_factory().default_box();
1356    let mut diff = factories.weight_factory().default_box();
1357    for i in 0..n {
1358        unsafe { key.deserialize_from_bytes(data, offsets[i * 2] as usize) };
1359        unsafe { diff.deserialize_from_bytes(data, offsets[i * 2 + 1] as usize) };
1360        builder.push_val_diff(&(), &diff);
1361        builder.push_key(&key);
1362    }
1363    builder.done()
1364}
1365
1366/// Separator that identifies the end of values for a key.
1367const SEPARATOR: u64 = u64::MAX;
1368
1369pub fn serialize_indexed_wset<B, K, V, R>(batch: &B) -> Vec<u8>
1370where
1371    B: BatchReader<Key = K, Val = V, Time = (), R = R>,
1372    K: DataTrait + ?Sized,
1373    V: DataTrait + ?Sized,
1374    R: WeightTrait + ?Sized,
1375{
1376    let mut s = Serializer::default();
1377    let mut offsets = Vec::with_capacity(2 * batch.len());
1378    let mut cursor = batch.cursor();
1379    offsets.push(batch.len());
1380
1381    while cursor.key_valid() {
1382        offsets.push(cursor.key().serialize(&mut s).unwrap());
1383        while cursor.val_valid() {
1384            offsets.push(cursor.val().serialize(&mut s).unwrap());
1385            offsets.push(cursor.weight().serialize(&mut s).unwrap());
1386
1387            cursor.step_val();
1388        }
1389        cursor.step_key();
1390        offsets.push(SEPARATOR as usize);
1391    }
1392    let _offset = s.serialize_value(&offsets).unwrap();
1393    s.into_serializer().into_inner().into_vec()
1394}
1395
1396pub fn deserialize_indexed_wset<B, K, V, R>(factories: &B::Factories, data: &[u8]) -> B
1397where
1398    B: Batch<Key = K, Val = V, Time = (), R = R>,
1399    K: DataTrait + ?Sized,
1400    V: DataTrait + ?Sized,
1401    R: WeightTrait + ?Sized,
1402{
1403    let offsets = unsafe { archived_root::<Vec<usize>>(data) };
1404    let len = offsets[0];
1405
1406    let mut builder = B::Builder::with_capacity(factories, len as usize, len as usize);
1407    let mut key = factories.key_factory().default_box();
1408    let mut val = factories.val_factory().default_box();
1409    let mut diff = factories.weight_factory().default_box();
1410
1411    let mut current_offset = 1;
1412
1413    while current_offset < offsets.len() {
1414        unsafe { key.deserialize_from_bytes(data, offsets[current_offset] as usize) };
1415        current_offset += 1;
1416        while offsets[current_offset] != SEPARATOR {
1417            unsafe { val.deserialize_from_bytes(data, offsets[current_offset] as usize) };
1418            current_offset += 1;
1419            unsafe { diff.deserialize_from_bytes(data, offsets[current_offset] as usize) };
1420
1421            builder.push_val_diff(&val, &diff);
1422            current_offset += 1;
1423        }
1424
1425        current_offset += 1;
1426        builder.push_key(&key);
1427    }
1428    builder.done()
1429}
1430
1431#[cfg(test)]
1432mod serialize_test {
1433    use crate::{
1434        DynZWeight, OrdIndexedZSet,
1435        algebra::OrdIndexedZSet as DynOrdIndexedZSet,
1436        dynamic::DynData,
1437        indexed_zset,
1438        trace::{BatchReader, deserialize_indexed_wset, serialize_indexed_wset},
1439    };
1440
1441    #[test]
1442    fn test_serialize_indexed_wset() {
1443        let test1: OrdIndexedZSet<u64, u64> = indexed_zset! {};
1444        let test2 = indexed_zset! { 1 => { 1 => 1 } };
1445        let test3 =
1446            indexed_zset! { 1 => { 1 => 1, 2 => 2, 3 => 3 }, 2 => { 1 => 1, 2 => 2, 3 => 3 } };
1447
1448        for test in [test1, test2, test3] {
1449            let serialized = serialize_indexed_wset(&*test);
1450            let deserialized = deserialize_indexed_wset::<
1451                DynOrdIndexedZSet<DynData, DynData>,
1452                DynData,
1453                DynData,
1454                DynZWeight,
1455            >(&test.factories(), &serialized);
1456
1457            assert_eq!(&*test, &deserialized);
1458        }
1459    }
1460
1461    #[test]
1462    fn test_serialize_indexed_wset_tup0_key() {
1463        let test1: OrdIndexedZSet<(), u64> = indexed_zset! {};
1464        let test2 = indexed_zset! { () => { 1 => 1 } };
1465
1466        for test in [test1, test2] {
1467            let serialized = serialize_indexed_wset(&*test);
1468            let deserialized = deserialize_indexed_wset::<
1469                DynOrdIndexedZSet<DynData, DynData>,
1470                DynData,
1471                DynData,
1472                DynZWeight,
1473            >(&test.factories(), &serialized);
1474
1475            assert_eq!(&*test, &deserialized);
1476        }
1477    }
1478
1479    #[test]
1480    fn test_serialize_indexed_wset_tup0_val() {
1481        let test1: OrdIndexedZSet<u64, ()> = indexed_zset! {};
1482        let test2 = indexed_zset! { 1 => { () => 1 } };
1483        let test3 = indexed_zset! { 1 => { () => 1 }, 2 => { () => 1 } };
1484
1485        for test in [test1, test2, test3] {
1486            let serialized = serialize_indexed_wset(&*test);
1487            let deserialized = deserialize_indexed_wset::<
1488                DynOrdIndexedZSet<DynData, DynData>,
1489                DynData,
1490                DynData,
1491                DynZWeight,
1492            >(&test.factories(), &serialized);
1493
1494            assert_eq!(&*test, &deserialized);
1495        }
1496    }
1497}