Skip to main content

dbsp/trace/ord/vec/
indexed_wset_batch.rs

1use crate::storage::file::{FilterStats, SerializerInner, TouchedWindowCount};
2use crate::trace::BatchLocation;
3use crate::trace::ord::merge_batcher::MergeBatcher;
4use crate::{
5    DBData, DBWeight, Error, NumEntries,
6    algebra::{NegByRef, ZRingValue},
7    circuit::checkpointer::Checkpoint,
8    dynamic::{
9        DataTrait, DynPair, DynVec, DynWeightedPairs, Erase, Factory, LeanVec, WeightTrait,
10        WeightTraitTyped, WithFactory,
11    },
12    trace::{
13        Batch, BatchFactories, BatchReader, BatchReaderFactories, Builder, Cursor, DbspSerializer,
14        Deserializer, Filter, GroupFilter, MergeCursor, VecValBatch, WeightedItem,
15        cursor::Position,
16        deserialize_indexed_wset,
17        layers::{
18            Cursor as _, Layer, LayerCursor, LayerFactories, Leaf, LeafFactories, OrdOffset, Trie,
19        },
20        serialize_indexed_wset,
21    },
22    utils::Tup2,
23};
24use crate::{DynZWeight, ZWeight};
25use itertools::{EitherOrBoth, Itertools};
26use rand::Rng;
27use rkyv::{Archive, Deserialize, Serialize};
28use size_of::SizeOf;
29use std::any::TypeId;
30use std::fmt::{self, Debug, Display};
31
32pub struct VecIndexedWSetFactories<K, V, R>
33where
34    K: DataTrait + ?Sized,
35    V: DataTrait + ?Sized,
36    R: WeightTrait + ?Sized,
37{
38    layer_factories: LayerFactories<K, LeafFactories<V, R>>,
39    item_factory: &'static dyn Factory<DynPair<K, V>>,
40    weighted_items_factory: &'static dyn Factory<DynWeightedPairs<DynPair<K, V>, R>>,
41    weighted_vals_factory: &'static dyn Factory<DynWeightedPairs<V, R>>,
42    weighted_item_factory: &'static dyn Factory<DynPair<DynPair<K, V>, R>>,
43}
44
45impl<K, V, R> Clone for VecIndexedWSetFactories<K, V, R>
46where
47    K: DataTrait + ?Sized,
48    V: DataTrait + ?Sized,
49    R: WeightTrait + ?Sized,
50{
51    fn clone(&self) -> Self {
52        Self {
53            layer_factories: self.layer_factories.clone(),
54            item_factory: self.item_factory,
55            weighted_items_factory: self.weighted_items_factory,
56            weighted_item_factory: self.weighted_item_factory,
57            weighted_vals_factory: self.weighted_vals_factory,
58        }
59    }
60}
61
62impl<K, V, R> BatchReaderFactories<K, V, (), R> for VecIndexedWSetFactories<K, V, R>
63where
64    K: DataTrait + ?Sized,
65    V: DataTrait + ?Sized,
66    R: WeightTrait + ?Sized,
67{
68    fn new<KType, VType, RType>() -> Self
69    where
70        KType: DBData + Erase<K>,
71        VType: DBData + Erase<V>,
72        RType: DBWeight + Erase<R>,
73    {
74        Self {
75            layer_factories: LayerFactories::new::<KType>(LeafFactories::new::<VType, RType>()),
76            item_factory: WithFactory::<Tup2<KType, VType>>::FACTORY,
77            weighted_items_factory:
78                WithFactory::<LeanVec<Tup2<Tup2<KType, VType>, RType>>>::FACTORY,
79            weighted_vals_factory: WithFactory::<LeanVec<Tup2<VType, RType>>>::FACTORY,
80            weighted_item_factory: WithFactory::<Tup2<Tup2<KType, VType>, RType>>::FACTORY,
81        }
82    }
83
84    fn key_factory(&self) -> &'static dyn Factory<K> {
85        self.layer_factories.key
86    }
87
88    fn keys_factory(&self) -> &'static dyn Factory<DynVec<K>> {
89        self.layer_factories.keys
90    }
91
92    fn val_factory(&self) -> &'static dyn Factory<V> {
93        self.layer_factories.child.key
94    }
95
96    fn weight_factory(&self) -> &'static dyn Factory<R> {
97        self.layer_factories.child.diff
98    }
99}
100
101impl<K, V, R> BatchFactories<K, V, (), R> for VecIndexedWSetFactories<K, V, R>
102where
103    K: DataTrait + ?Sized,
104    V: DataTrait + ?Sized,
105    R: WeightTrait + ?Sized,
106{
107    fn item_factory(&self) -> &'static dyn Factory<DynPair<K, V>> {
108        self.item_factory
109    }
110
111    fn weighted_items_factory(&self) -> &'static dyn Factory<DynWeightedPairs<DynPair<K, V>, R>> {
112        self.weighted_items_factory
113    }
114
115    fn weighted_vals_factory(&self) -> &'static dyn Factory<DynWeightedPairs<V, R>> {
116        self.weighted_vals_factory
117    }
118
119    fn weighted_item_factory(&self) -> &'static dyn Factory<WeightedItem<K, V, R>> {
120        self.weighted_item_factory
121    }
122
123    fn time_diffs_factory(
124        &self,
125    ) -> Option<&'static dyn Factory<DynWeightedPairs<crate::dynamic::DynDataTyped<()>, R>>> {
126        None
127    }
128
129    // fn weighted_item_factory(&self) -> &'static <Pair<K, V>, R> {
130    //     self.weighted_item_factory
131    // }
132
133    // fn batch_item_factory(&self) -> &'static BatchItemFactory<K, V, Pair<K, V>,
134    // R> {     self.batch_item_factory
135    // }
136
137    /*fn item_from<'a, I>(
138        &self,
139        key: OwnedInitRef<'a, K>,
140        val: OwnedInitRef<'a, V>,
141        weight: OwnedInitRef<'a, R>,
142        item: I,
143    ) -> I::Output
144    where
145        I: MutRef<Self::WeightedItem>,
146    {
147        let (keyval_ref, weight_ref) = self.weighted_item_factory.split(item);
148        let (key_ref, val_ref) = self.item_factory.split(keyval_uninit);
149        self.layer_factories.key.write_uninit(key, key_uninit);
150        self.layer_factories.child.key.write_uninit(val, val_uninit);
151        self.layer_factories
152            .child
153            .diff
154            .write_uninit(weight, weight_uninit);
155    }*/
156}
157
158type Layers<K, V, R, O> = Layer<K, Leaf<V, R>, O>;
159
160/// An immutable collection of update tuples.
161pub struct VecIndexedWSet<K, V, R, O = usize>
162where
163    K: DataTrait + ?Sized,
164    V: DataTrait + ?Sized,
165    R: WeightTrait + ?Sized,
166    O: OrdOffset,
167{
168    /// Where all the data is.
169    #[doc(hidden)]
170    pub layer: Layers<K, V, R, O>,
171    factories: VecIndexedWSetFactories<K, V, R>,
172    negative_weight_count: u64,
173    touched_window_count: TouchedWindowCount,
174}
175
176impl<K, V, R, O> VecIndexedWSet<K, V, R, O>
177where
178    K: DataTrait + ?Sized,
179    V: DataTrait + ?Sized,
180    R: WeightTrait + ?Sized,
181    O: OrdOffset,
182{
183    pub fn from_parts(
184        factories: VecIndexedWSetFactories<K, V, R>,
185        keys: Box<DynVec<K>>,
186        offs: Vec<O>,
187        vals: Box<DynVec<V>>,
188        diffs: Box<DynVec<R>>,
189        negative_weight_count: u64,
190    ) -> Self {
191        Self {
192            layer: Layer::from_parts(
193                &factories.layer_factories,
194                keys,
195                offs,
196                Leaf::from_parts(&factories.layer_factories.child, vals, diffs),
197            ),
198            factories,
199            negative_weight_count,
200            touched_window_count: TouchedWindowCount::default(),
201        }
202    }
203}
204
205impl<K, V, R, O> SizeOf for VecIndexedWSet<K, V, R, O>
206where
207    K: DataTrait + ?Sized,
208    V: DataTrait + ?Sized,
209    R: WeightTrait + ?Sized,
210    O: OrdOffset,
211{
212    fn size_of_children(&self, context: &mut size_of::Context) {
213        // This is only approximate but it is *much* cheaper than measuring all
214        // the elements individually.
215        context.add(self.approximate_byte_size());
216    }
217}
218
219impl<K, V, R, O> PartialEq for VecIndexedWSet<K, V, R, O>
220where
221    K: DataTrait + ?Sized,
222    V: DataTrait + ?Sized,
223    R: WeightTrait + ?Sized,
224    O: OrdOffset,
225{
226    fn eq(&self, other: &Self) -> bool {
227        self.layer == other.layer
228    }
229}
230
231impl<K, V, R, O> Eq for VecIndexedWSet<K, V, R, O>
232where
233    K: DataTrait + ?Sized,
234    V: DataTrait + ?Sized,
235    R: WeightTrait + ?Sized,
236    O: OrdOffset,
237{
238}
239
240impl<K, V, R, O> Debug for VecIndexedWSet<K, V, R, O>
241where
242    K: DataTrait + ?Sized,
243    V: DataTrait + ?Sized,
244    R: WeightTrait + ?Sized,
245    O: OrdOffset,
246{
247    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248        f.debug_struct("VecIndexedWSet")
249            .field("layer", &self.layer)
250            .finish()
251    }
252}
253
254impl<K, V, R, O: OrdOffset> Clone for VecIndexedWSet<K, V, R, O>
255where
256    K: DataTrait + ?Sized,
257    V: DataTrait + ?Sized,
258    R: WeightTrait + ?Sized,
259{
260    fn clone(&self) -> Self {
261        Self {
262            layer: self.layer.clone(),
263            factories: self.factories.clone(),
264            negative_weight_count: self.negative_weight_count,
265            touched_window_count: self.touched_window_count,
266        }
267    }
268}
269
270impl<K, V, R, O> Display for VecIndexedWSet<K, V, R, O>
271where
272    K: DataTrait + ?Sized,
273    V: DataTrait + ?Sized,
274    R: WeightTrait + ?Sized,
275    O: OrdOffset,
276{
277    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
278        writeln!(
279            f,
280            "layer:\n{}",
281            textwrap::indent(&self.layer.to_string(), "    ")
282        )
283    }
284}
285
286/*impl<K, V, R, KV, KVR, O> Default for VecIndexedWSet<K, V, R, KV, KVR, O>
287where
288    O: OrdOffset,
289{
290    #[inline]
291    fn default() -> Self {
292        Self::empty(())
293    }
294}*/
295
296/*impl<K, V, R, KV, KVR, O> From<Layers<K, V, R, O>> for VecIndexedWSet<K, V, R, KV, KVR, O>
297where
298    O: OrdOffset,
299{
300    #[inline]
301    fn from(layer: Layers<K, V, R, O>) -> Self {
302        Self { layer }
303    }
304}
305
306impl<K, V, R, O> From<Layers<K, V, R, O>> for Rc<VecIndexedWSet<K, V, R, O>>
307where
308    K: Ord,
309    V: Ord,
310    R: Clone,
311    O: OrdOffset,
312{
313    #[inline]
314    fn from(layer: Layers<K, V, R, O>) -> Self {
315        Rc::new(From::from(layer))
316    }
317}*/
318
319impl<K, V, R, O> NumEntries for VecIndexedWSet<K, V, R, O>
320where
321    K: DataTrait + ?Sized,
322    V: DataTrait + ?Sized,
323    R: WeightTrait + ?Sized,
324    O: OrdOffset,
325{
326    const CONST_NUM_ENTRIES: Option<usize> = Layers::<K, V, R, O>::CONST_NUM_ENTRIES;
327
328    #[inline]
329    fn num_entries_shallow(&self) -> usize {
330        self.layer.num_entries_shallow()
331    }
332
333    #[inline]
334    fn num_entries_deep(&self) -> usize {
335        self.layer.num_entries_deep()
336    }
337}
338
339impl<K, V, R, O> NegByRef for VecIndexedWSet<K, V, R, O>
340where
341    K: DataTrait + ?Sized,
342    V: DataTrait + ?Sized,
343    R: WeightTraitTyped + ?Sized,
344    R::Type: DBWeight + ZRingValue + Erase<R>,
345    O: OrdOffset,
346{
347    #[inline]
348    fn neg_by_ref(&self) -> Self {
349        Self {
350            layer: self.layer.neg_by_ref(),
351            factories: self.factories.clone(),
352            negative_weight_count: self.negative_weight_count,
353            touched_window_count: self.touched_window_count,
354        }
355    }
356}
357
358impl<K, V, R, O: OrdOffset> Deserialize<VecIndexedWSet<K, V, R, O>, Deserializer> for ()
359where
360    K: DataTrait + ?Sized,
361    V: DataTrait + ?Sized,
362    R: WeightTrait + ?Sized,
363    O: OrdOffset,
364{
365    fn deserialize(
366        &self,
367        _deserializer: &mut Deserializer,
368    ) -> Result<VecIndexedWSet<K, V, R, O>, <Deserializer as rkyv::Fallible>::Error> {
369        todo!()
370    }
371}
372
373impl<K, V, R, O: OrdOffset> Archive for VecIndexedWSet<K, V, R, O>
374where
375    K: DataTrait + ?Sized,
376    V: DataTrait + ?Sized,
377    R: WeightTrait + ?Sized,
378    O: OrdOffset,
379{
380    type Archived = ();
381    type Resolver = ();
382
383    unsafe fn resolve(&self, _pos: usize, _resolver: Self::Resolver, _out: *mut Self::Archived) {
384        todo!()
385    }
386}
387impl<K, V, R, O> Serialize<DbspSerializer<'_>> for VecIndexedWSet<K, V, R, O>
388where
389    K: DataTrait + ?Sized,
390    V: DataTrait + ?Sized,
391    R: WeightTrait + ?Sized,
392    O: OrdOffset,
393{
394    fn serialize(
395        &self,
396        _serializer: &mut DbspSerializer,
397    ) -> Result<Self::Resolver, <DbspSerializer<'_> as rkyv::Fallible>::Error> {
398        todo!()
399    }
400}
401
402/*pub trait Deserializable: Archive<Archived = Self::ArchivedDeser> + Sized {
403    type ArchivedDeser: Deserialize<Self, Deserializer>;
404}
405impl<T: Archive> Deserializable for T
406where
407    Archived<T>: Deserialize<T, Deserializer>,
408{
409    type ArchivedDeser = Archived<T>;
410*/
411
412impl<K, V, R, O> BatchReader for VecIndexedWSet<K, V, R, O>
413where
414    K: DataTrait + ?Sized,
415    V: DataTrait + ?Sized,
416    R: WeightTrait + ?Sized,
417    O: OrdOffset,
418{
419    type Key = K;
420    type Val = V;
421    type Time = ();
422    type R = R;
423    type Cursor<'s>
424        = VecIndexedWSetCursor<'s, K, V, R, O>
425    where
426        V: 's,
427        O: 's;
428    type Factories = VecIndexedWSetFactories<K, V, R>;
429    // type Consumer = VecIndexedWSetConsumer<K, V, R, O>;
430
431    #[inline]
432    fn cursor(&self) -> Self::Cursor<'_> {
433        VecIndexedWSetCursor::new(self)
434    }
435
436    fn consuming_cursor(
437        &mut self,
438        key_filter: Option<Filter<Self::Key>>,
439        value_filter: Option<GroupFilter<Self::Val>>,
440    ) -> Box<dyn crate::trace::MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_>
441    {
442        if key_filter.is_none() && value_filter.is_none() {
443            Box::new(VecIndexedWSetConsumingCursor::new(self))
444        } else {
445            self.merge_cursor(key_filter, value_filter)
446        }
447    }
448
449    fn factories(&self) -> Self::Factories {
450        self.factories.clone()
451    }
452
453    /*#[inline]
454    fn consumer(self) -> Self::Consumer {
455        VecIndexedWSetConsumer {
456            consumer: OrderedLayerConsumer::from(self.layer),
457        }
458    }*/
459
460    #[inline]
461    fn key_count(&self) -> usize {
462        self.layer.keys()
463    }
464
465    #[inline]
466    fn len(&self) -> usize {
467        self.layer.tuples()
468    }
469
470    #[inline]
471    fn approximate_byte_size(&self) -> usize {
472        self.layer.approximate_byte_size()
473    }
474
475    fn membership_filter_stats(&self) -> FilterStats {
476        FilterStats::default()
477    }
478
479    fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
480    where
481        RG: Rng,
482    {
483        self.layer.sample_keys(rng, sample_size, sample);
484    }
485
486    fn keys(&self) -> Option<&DynVec<Self::Key>> {
487        Some(&*self.layer.keys)
488    }
489}
490
491impl<K, V, R, O> Batch for VecIndexedWSet<K, V, R, O>
492where
493    K: DataTrait + ?Sized,
494    V: DataTrait + ?Sized,
495    R: WeightTrait + ?Sized,
496    O: OrdOffset,
497{
498    type Timed<T: crate::Timestamp> = VecValBatch<K, V, T, R, O>;
499    type Batcher = MergeBatcher<Self>;
500    type Builder = VecIndexedWSetBuilder<K, V, R, O>;
501
502    fn key_bounds(&self) -> Option<(&Self::Key, &Self::Key)> {
503        Some((self.layer.keys.first()?, self.layer.keys.last()?))
504    }
505
506    fn negative_weight_count(&self) -> Option<u64> {
507        Some(self.negative_weight_count)
508    }
509
510    fn touched_window_count(&self) -> TouchedWindowCount {
511        self.touched_window_count
512    }
513}
514
515/// A cursor for navigating a single layer.
516#[derive(Debug, SizeOf)]
517pub struct VecIndexedWSetCursor<'s, K, V, R, O = usize>
518where
519    K: DataTrait + ?Sized,
520    V: DataTrait + ?Sized,
521    R: WeightTrait + ?Sized,
522    O: OrdOffset,
523{
524    pub(crate) cursor: LayerCursor<'s, K, Leaf<V, R>, O>,
525}
526
527impl<'s, K, V, R, O> VecIndexedWSetCursor<'s, K, V, R, O>
528where
529    K: DataTrait + ?Sized,
530    V: DataTrait + ?Sized,
531    R: WeightTrait + ?Sized,
532    O: OrdOffset,
533{
534    pub fn new(zset: &'s VecIndexedWSet<K, V, R, O>) -> Self {
535        Self {
536            cursor: zset.layer.cursor(),
537        }
538    }
539
540    pub fn new_from(zset: &'s VecIndexedWSet<K, V, R, O>, lower_bound: usize) -> Self {
541        Self {
542            cursor: zset.layer.cursor_from(lower_bound, zset.layer.keys()),
543        }
544    }
545}
546impl<K, V, R, O> Clone for VecIndexedWSetCursor<'_, K, V, R, O>
547where
548    K: DataTrait + ?Sized,
549    V: DataTrait + ?Sized,
550    R: WeightTrait + ?Sized,
551    O: OrdOffset,
552{
553    fn clone(&self) -> Self {
554        Self {
555            cursor: self.cursor.clone(),
556        }
557    }
558}
559
560impl<K, V, R, O> Cursor<K, V, (), R> for VecIndexedWSetCursor<'_, K, V, R, O>
561where
562    K: DataTrait + ?Sized,
563    V: DataTrait + ?Sized,
564    R: WeightTrait + ?Sized,
565    O: OrdOffset,
566{
567    // fn key_factory(&self) -> &'static Factory<K> {
568    //     self.cursor.storage.factories.key
569    // }
570
571    // fn val_factory(&self) -> &'static Factory<V> {
572    //     self.cursor.child.storage.factories.key
573    // }
574
575    fn weight_factory(&self) -> &'static dyn Factory<R> {
576        self.cursor.child.storage.factories.diff
577    }
578
579    fn key(&self) -> &K {
580        self.cursor.item()
581    }
582
583    fn val(&self) -> &V {
584        self.cursor.child.current_key()
585    }
586
587    fn map_times(&mut self, logic: &mut dyn FnMut(&(), &R)) {
588        if self.cursor.child.valid() {
589            logic(&(), self.cursor.child.current_diff())
590        }
591    }
592
593    fn map_times_through(&mut self, _upper: &(), logic: &mut dyn FnMut(&(), &R)) {
594        self.map_times(logic)
595    }
596
597    fn weight(&mut self) -> &R {
598        debug_assert!(&self.cursor.valid());
599        self.cursor.child.current_diff()
600    }
601
602    fn weight_checked(&mut self) -> &R {
603        self.weight()
604    }
605
606    fn map_values(&mut self, logic: &mut dyn FnMut(&V, &R)) {
607        while self.val_valid() {
608            logic(self.val(), self.cursor.child.current_diff());
609            self.step_val();
610        }
611    }
612
613    fn key_valid(&self) -> bool {
614        self.cursor.valid()
615    }
616
617    fn val_valid(&self) -> bool {
618        self.cursor.child.valid()
619    }
620
621    fn step_key(&mut self) {
622        self.cursor.step();
623    }
624
625    fn step_key_reverse(&mut self) {
626        self.cursor.step_reverse();
627    }
628
629    fn seek_key(&mut self, key: &K) {
630        self.cursor.seek(key);
631    }
632
633    fn seek_key_exact(&mut self, key: &K, _hash: Option<u64>) -> bool {
634        self.seek_key(key);
635        self.key_valid() && self.key().eq(key)
636    }
637
638    fn seek_key_with(&mut self, predicate: &dyn Fn(&K) -> bool) {
639        self.cursor.seek_with(predicate);
640    }
641
642    fn seek_key_reverse(&mut self, key: &K) {
643        self.cursor.seek_reverse(key);
644    }
645
646    fn seek_key_with_reverse(&mut self, predicate: &dyn Fn(&K) -> bool) {
647        self.cursor.seek_with_reverse(predicate);
648    }
649
650    fn step_val(&mut self) {
651        self.cursor.child.step();
652    }
653
654    fn seek_val(&mut self, val: &V) {
655        self.cursor.child.seek(val);
656    }
657
658    fn seek_val_with(&mut self, predicate: &dyn Fn(&V) -> bool) {
659        self.cursor.child.seek_key_with(predicate);
660    }
661
662    fn rewind_keys(&mut self) {
663        self.cursor.rewind();
664    }
665
666    fn fast_forward_keys(&mut self) {
667        self.cursor.fast_forward();
668    }
669
670    fn rewind_vals(&mut self) {
671        self.cursor.child.rewind();
672    }
673
674    fn step_val_reverse(&mut self) {
675        self.cursor.child.step_reverse();
676    }
677
678    fn seek_val_reverse(&mut self, val: &V) {
679        self.cursor.child.seek_reverse(val);
680    }
681
682    fn seek_val_with_reverse(&mut self, predicate: &dyn Fn(&V) -> bool) {
683        self.cursor.child.seek_key_with_reverse(predicate);
684    }
685
686    fn fast_forward_vals(&mut self) {
687        self.cursor.child.fast_forward();
688    }
689
690    fn position(&self) -> Option<Position> {
691        Some(Position {
692            total: self.cursor.keys() as u64,
693            offset: self.cursor.pos() as u64,
694        })
695    }
696}
697
698/// A builder for creating layers from unsorted update tuples.
699#[derive(SizeOf)]
700pub struct VecIndexedWSetBuilder<K, V, R, O = usize>
701where
702    K: DataTrait + ?Sized,
703    V: DataTrait + ?Sized,
704    R: WeightTrait + ?Sized,
705    O: OrdOffset,
706{
707    #[size_of(skip)]
708    factories: VecIndexedWSetFactories<K, V, R>,
709    keys: Box<DynVec<K>>,
710    offs: Vec<O>,
711    vals: Box<DynVec<V>>,
712    diffs: Box<DynVec<R>>,
713    negative_weight_count: u64,
714}
715
716impl<K, V, R, O> VecIndexedWSetBuilder<K, V, R, O>
717where
718    K: DataTrait + ?Sized,
719    V: DataTrait + ?Sized,
720    R: WeightTrait + ?Sized,
721    O: OrdOffset,
722{
723    fn pushed_key(&mut self) {
724        self.offs.push(O::from_usize(self.vals.len()));
725
726        debug_assert!(
727            {
728                let n = self.offs.len();
729                self.offs[n - 1] > self.offs[n - 2]
730            },
731            "every key must have at least one value"
732        );
733
734        debug_assert!(
735            {
736                let n = self.keys.len();
737                n == 1 || self.keys[n - 2] < self.keys[n - 1]
738            },
739            "keys must be strictly monotonically increasing but {:?} >= {:?}",
740            &self.keys[self.keys.len() - 2],
741            &self.keys[self.keys.len() - 1]
742        );
743    }
744
745    fn pushed_val(&self) {
746        debug_assert_eq!(
747            self.vals.len(),
748            self.diffs.len(),
749            "every value must have exactly one diff"
750        );
751
752        debug_assert!(
753            {
754                let n = self.vals.len();
755                let last_n = self.offs.last().unwrap().into_usize();
756                let n_vals = n - last_n;
757                n_vals < 2 || self.vals[n - 2] < self.vals[n - 1]
758            },
759            "values for a key must be strictly monotonically increasing but {:?} >= {:?}",
760            &self.vals[self.vals.len() - 2],
761            &self.vals[self.vals.len() - 1]
762        );
763    }
764
765    fn pushed_diff(&self) {
766        debug_assert_eq!(
767            self.vals.len() + 1,
768            self.diffs.len(),
769            "every value must have exactly one diff"
770        );
771    }
772
773    fn update_total_weight(&mut self, weight: &R) {
774        if TypeId::of::<R>() == TypeId::of::<DynZWeight>() {
775            let weight = unsafe { weight.downcast::<ZWeight>() };
776            if !weight.ge0() {
777                self.negative_weight_count += 1;
778            }
779        }
780    }
781
782    /// Copies the contents of this in-progress [Builder] to `dst`.
783    ///
784    /// This handles all the possible states that this builder can be in (such
785    /// as time-diff pairs without a value yet, and values without a key yet)
786    /// and reproduces them in `dst`.
787    pub fn copy_to_builder<B, BO>(&self, dst: &mut B)
788    where
789        B: Builder<BO>,
790        BO: Batch<Key = K, Val = V, R = R, Time = ()>,
791    {
792        let mut key_index = 0;
793        for (val_diff, val_index) in self
794            .vals
795            .dyn_iter()
796            .zip_longest(self.diffs.dyn_iter())
797            .zip(1..)
798        {
799            match val_diff {
800                EitherOrBoth::Both(val, diff) => {
801                    dst.push_val_diff(val, diff);
802                    if self
803                        .offs
804                        .get(key_index + 1)
805                        .is_some_and(|val_offset| O::from_usize(val_index) >= *val_offset)
806                    {
807                        dst.push_key(&self.keys[key_index]);
808                        key_index += 1;
809                    }
810                }
811                EitherOrBoth::Left(_) => unreachable!(),
812                EitherOrBoth::Right(diff) => {
813                    dst.push_time_diff(&(), diff);
814                }
815            }
816        }
817    }
818}
819
820impl<K, V, R, O> Builder<VecIndexedWSet<K, V, R, O>> for VecIndexedWSetBuilder<K, V, R, O>
821where
822    K: DataTrait + ?Sized,
823    V: DataTrait + ?Sized,
824    R: WeightTrait + ?Sized,
825    O: OrdOffset,
826{
827    fn with_capacity_in_location(
828        factories: &VecIndexedWSetFactories<K, V, R>,
829        key_capacity: usize,
830        value_capacity: usize,
831        _location: Option<BatchLocation>,
832    ) -> Self {
833        let mut keys = factories.layer_factories.keys.default_box();
834        keys.reserve_exact(key_capacity);
835
836        let mut offs = Vec::with_capacity(key_capacity + 1);
837        offs.push(O::zero());
838
839        let mut vals = factories.layer_factories.child.keys.default_box();
840        vals.reserve_exact(value_capacity);
841
842        let mut diffs = factories.layer_factories.child.diffs.default_box();
843        diffs.reserve_exact(value_capacity);
844        Self {
845            factories: factories.clone(),
846            keys,
847            offs,
848            vals,
849            diffs,
850            negative_weight_count: 0,
851        }
852    }
853
854    fn reserve(&mut self, additional: usize) {
855        self.keys.reserve(additional);
856        self.offs.reserve(additional);
857        self.vals.reserve(additional);
858        self.diffs.reserve(additional);
859    }
860
861    fn push_key(&mut self, key: &K) {
862        self.keys.push_ref(key);
863        self.pushed_key();
864    }
865
866    fn push_key_mut(&mut self, key: &mut K) {
867        self.keys.push_val(key);
868        self.pushed_key();
869    }
870
871    fn push_val(&mut self, val: &V) {
872        self.vals.push_ref(val);
873        self.pushed_val();
874    }
875
876    fn push_val_mut(&mut self, val: &mut V) {
877        self.vals.push_val(val);
878        self.pushed_val();
879    }
880
881    fn push_time_diff(&mut self, _time: &(), weight: &R) {
882        debug_assert!(!weight.is_zero());
883        self.update_total_weight(weight);
884        self.diffs.push_ref(weight);
885        self.pushed_diff();
886    }
887
888    fn push_time_diff_mut(&mut self, _time: &mut (), weight: &mut R) {
889        debug_assert!(!weight.is_zero());
890        self.update_total_weight(weight);
891        self.diffs.push_val(weight);
892        self.pushed_diff();
893    }
894
895    fn push_val_diff(&mut self, val: &V, weight: &R) {
896        debug_assert!(!weight.is_zero());
897        self.update_total_weight(weight);
898        self.vals.push_ref(val);
899        self.diffs.push_ref(weight);
900        self.pushed_val();
901    }
902
903    fn push_val_diff_mut(&mut self, val: &mut V, weight: &mut R) {
904        debug_assert!(!weight.is_zero());
905        self.update_total_weight(weight);
906        self.vals.push_val(val);
907        self.diffs.push_val(weight);
908        self.pushed_val();
909    }
910
911    fn done(self) -> VecIndexedWSet<K, V, R, O> {
912        VecIndexedWSet::from_parts(
913            self.factories,
914            self.keys,
915            self.offs,
916            self.vals,
917            self.diffs,
918            self.negative_weight_count,
919        )
920    }
921
922    fn num_keys(&self) -> usize {
923        self.keys.len()
924    }
925
926    fn num_tuples(&self) -> usize {
927        self.diffs.len()
928    }
929}
930
931/// A cursor for consuming a [VecIndexedWSet].
932struct VecIndexedWSetConsumingCursor<'a, K, V, R, O = usize>
933where
934    K: DataTrait + ?Sized,
935    V: DataTrait + ?Sized,
936    R: WeightTrait + ?Sized,
937    O: OrdOffset,
938{
939    wset: &'a mut VecIndexedWSet<K, V, R, O>,
940    key_index: usize,
941    val_index: usize,
942    val_max: usize,
943}
944
945impl<'a, K, V, R, O> VecIndexedWSetConsumingCursor<'a, K, V, R, O>
946where
947    K: DataTrait + ?Sized,
948    V: DataTrait + ?Sized,
949    R: WeightTrait + ?Sized,
950    O: OrdOffset,
951{
952    fn new(wset: &'a mut VecIndexedWSet<K, V, R, O>) -> Self {
953        let val_max = wset
954            .layer
955            .offs
956            .get(1)
957            .map_or(0, |offset| offset.into_usize());
958        Self {
959            wset,
960            key_index: 0,
961            val_index: 0,
962            val_max,
963        }
964    }
965}
966
967impl<K, V, R, O> MergeCursor<K, V, (), R> for VecIndexedWSetConsumingCursor<'_, K, V, R, O>
968where
969    K: DataTrait + ?Sized,
970    V: DataTrait + ?Sized,
971    R: WeightTrait + ?Sized,
972    O: OrdOffset,
973{
974    fn key_valid(&self) -> bool {
975        self.key_index < self.wset.layer.keys.len()
976    }
977    fn val_valid(&self) -> bool {
978        self.val_index < self.val_max
979    }
980    fn key(&self) -> &K {
981        self.wset.layer.keys.index(self.key_index)
982    }
983
984    fn val(&self) -> &V {
985        debug_assert!(self.val_valid());
986        &self.wset.layer.vals.keys[self.val_index]
987    }
988
989    fn map_times(&mut self, logic: &mut dyn FnMut(&(), &R)) {
990        logic(&(), &self.wset.layer.vals.diffs[self.val_index])
991    }
992
993    fn weight(&mut self) -> &R {
994        &self.wset.layer.vals.diffs[self.val_index]
995    }
996
997    fn has_mut(&self) -> bool {
998        true
999    }
1000
1001    fn key_mut(&mut self) -> &mut K {
1002        &mut self.wset.layer.keys[self.key_index]
1003    }
1004
1005    fn val_mut(&mut self) -> &mut V {
1006        &mut self.wset.layer.vals.keys[self.val_index]
1007    }
1008
1009    fn weight_mut(&mut self) -> &mut R {
1010        &mut self.wset.layer.vals.diffs[self.val_index]
1011    }
1012
1013    fn step_key(&mut self) {
1014        self.key_index += 1;
1015        if self.key_valid() {
1016            self.val_index = self.wset.layer.offs[self.key_index].into_usize();
1017            self.val_max = self.wset.layer.offs[self.key_index + 1].into_usize();
1018        } else {
1019            self.val_index = 0;
1020            self.val_max = 0;
1021        }
1022    }
1023
1024    fn step_val(&mut self) {
1025        self.val_index += 1;
1026    }
1027}
1028
1029impl<K, V, R> Checkpoint for VecIndexedWSet<K, V, R>
1030where
1031    K: DataTrait + ?Sized,
1032    V: DataTrait + ?Sized,
1033    R: WeightTrait + ?Sized,
1034{
1035    fn checkpoint(&self) -> Result<Vec<u8>, Error> {
1036        Ok(serialize_indexed_wset(self, &mut SerializerInner::new()).into_vec())
1037    }
1038
1039    fn restore(&mut self, data: &[u8]) -> Result<(), Error> {
1040        *self = deserialize_indexed_wset(&self.factories, data);
1041        Ok(())
1042    }
1043}