Skip to main content

dbsp/operator/dynamic/group/
lag.rs

1use super::{GroupTransformer, Monotonicity};
2use crate::Circuit;
3use crate::algebra::{OrdIndexedZSetFactories, ZRingValue};
4use crate::operator::dynamic::filter_map::DynFilterMap;
5use crate::{
6    DBData, DynZWeight, RootCircuit, Stream, ZWeight,
7    algebra::{HasZero, IndexedZSet, OrdIndexedZSet, ZCursor},
8    dynamic::{
9        ClonableTrait, DataTrait, DynData, DynPair, DynUnit, DynVec, Erase, Factory, LeanVec,
10        WithFactory,
11    },
12    operator::dynamic::MonoIndexedZSet,
13    trace::{
14        BatchReaderFactories,
15        cursor::{CursorPair, ReverseKeyCursor},
16    },
17    utils::Tup2,
18};
19use std::{cmp::Ordering, marker::PhantomData, ops::Neg};
20
21const MAX_RETRACTIONS_CAPACITY: usize = 100_000usize;
22
23pub struct LagFactories<B: IndexedZSet, OV: DataTrait + ?Sized> {
24    input_factories: B::Factories,
25    output_factories: OrdIndexedZSetFactories<B::Key, DynPair<B::Val, OV>>,
26    keys_factory: &'static dyn Factory<AffectedKeys<B::Val>>,
27    output_val_factory: &'static dyn Factory<OV>,
28}
29
30impl<B, OV> LagFactories<B, OV>
31where
32    B: IndexedZSet,
33    OV: DataTrait + ?Sized,
34{
35    pub fn new<KType, VType, OVType>() -> Self
36    where
37        KType: DBData + Erase<B::Key>,
38        VType: DBData + Erase<B::Val>,
39        OVType: DBData + Erase<OV>,
40    {
41        Self {
42            input_factories: BatchReaderFactories::new::<KType, VType, ZWeight>(),
43            output_factories: BatchReaderFactories::new::<KType, Tup2<VType, OVType>, ZWeight>(),
44            keys_factory: WithFactory::<LeanVec<VType>>::FACTORY,
45            output_val_factory: WithFactory::<OVType>::FACTORY,
46        }
47    }
48}
49
50pub struct LagCustomOrdFactories<
51    B: IndexedZSet,
52    V2: DataTrait + ?Sized,
53    VL: DataTrait + ?Sized,
54    OV: DataTrait + ?Sized,
55> {
56    lag_factories: LagFactories<OrdIndexedZSet<B::Key, V2>, VL>,
57    output_factories: OrdIndexedZSetFactories<B::Key, OV>,
58}
59
60impl<B, V2, VL, OV> LagCustomOrdFactories<B, V2, VL, OV>
61where
62    B: IndexedZSet,
63    V2: DataTrait + ?Sized,
64    VL: DataTrait + ?Sized,
65    OV: DataTrait + ?Sized,
66{
67    pub fn new<KType, VType, V2Type, VLType, OVType>() -> Self
68    where
69        KType: DBData + Erase<B::Key>,
70        VType: DBData + Erase<B::Val>,
71        V2Type: DBData + Erase<V2>,
72        VLType: DBData + Erase<VL>,
73        OVType: DBData + Erase<OV>,
74    {
75        Self {
76            lag_factories: LagFactories::new::<KType, V2Type, VLType>(),
77            output_factories: BatchReaderFactories::new::<KType, OVType, ZWeight>(),
78        }
79    }
80}
81
82impl<B> Stream<RootCircuit, B>
83where
84    B: IndexedZSet + Send,
85{
86    /// See [`Stream::lag`].
87    #[allow(clippy::type_complexity)]
88    pub fn dyn_lag<OV>(
89        &self,
90        persistent_id: Option<&str>,
91        factories: &LagFactories<B, OV>,
92        offset: isize,
93        project: Box<dyn Fn(Option<&B::Val>, &mut OV)>,
94    ) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, DynPair<B::Val, OV>>>
95    where
96        OV: DataTrait + ?Sized,
97    {
98        self.dyn_group_transform(
99            persistent_id,
100            &factories.input_factories,
101            &factories.output_factories,
102            Box::new(Lag::new(
103                factories.output_factories.val_factory(),
104                factories.keys_factory,
105                factories.output_val_factory,
106                offset.unsigned_abs(),
107                offset > 0,
108                project,
109                if offset > 0 {
110                    |k1: &B::Val, k2: &B::Val| k1.cmp(k2)
111                } else {
112                    |k1: &B::Val, k2: &B::Val| k2.cmp(k1)
113                },
114            )),
115        )
116    }
117}
118
119impl Stream<RootCircuit, MonoIndexedZSet> {
120    pub fn dyn_lag_custom_order_mono(
121        &self,
122        persistent_id: Option<&str>,
123        factories: &LagCustomOrdFactories<MonoIndexedZSet, DynData, DynData, DynData>,
124        offset: isize,
125        encode: Box<dyn Fn(&DynData, &mut DynData)>,
126        project: Box<dyn Fn(Option<&DynData>, &mut DynData)>,
127        decode: Box<dyn Fn(&DynData, &DynData, &mut DynData)>,
128    ) -> Stream<RootCircuit, MonoIndexedZSet> {
129        self.dyn_lag_custom_order(persistent_id, factories, offset, encode, project, decode)
130    }
131}
132
133impl<B, K, V> Stream<RootCircuit, B>
134where
135    B: IndexedZSet<Key = K, Val = V> + Send,
136    K: DataTrait + ?Sized,
137    V: DataTrait + ?Sized,
138{
139    /// See [`Stream::lag_custom_order`].
140    pub fn dyn_lag_custom_order<V2, VL, OV>(
141        &self,
142        persistent_id: Option<&str>,
143        factories: &LagCustomOrdFactories<B, V2, VL, OV>,
144        offset: isize,
145        encode: Box<dyn Fn(&V, &mut V2)>,
146        project: Box<dyn Fn(Option<&V2>, &mut VL)>,
147        decode: Box<dyn Fn(&V2, &VL, &mut OV)>,
148    ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
149    where
150        V2: DataTrait + ?Sized,
151        VL: DataTrait + ?Sized,
152        OV: DataTrait + ?Sized,
153        B: for<'a> DynFilterMap<DynItemRef<'a> = (&'a K, &'a V)>,
154    {
155        let name = if offset > 0 {
156            format!("lag_custom_order_{offset}")
157        } else {
158            format!("lead_custom_order_{}", -offset)
159        };
160
161        self.circuit().region(&name, || {
162            self.dyn_map_index(
163                &factories.lag_factories.input_factories,
164                Box::new(move |(k, v), kv| {
165                    let (out_k, out_v) = kv.split_mut();
166                    k.clone_to(out_k);
167                    encode(v, out_v);
168                }),
169            )
170            .set_persistent_id(
171                persistent_id
172                    .map(|name| format!("{name}-ordered"))
173                    .as_deref(),
174            )
175            .dyn_lag(persistent_id, &factories.lag_factories, offset, project)
176            .dyn_map_index(
177                &factories.output_factories,
178                Box::new(move |(k, v), kv| {
179                    let (out_k, out_v) = kv.split_mut();
180                    let (v1, v2) = v.split();
181                    k.clone_to(out_k);
182                    decode(v1, v2, out_v);
183                }),
184            )
185        })
186    }
187}
188
189/// Implement both `lag` and `lead` operators.
190struct Lag<I: DataTrait + ?Sized, O: DataTrait + ?Sized, KCF> {
191    name: String,
192    lag: usize,
193    /// `true` for `lag`, `false` for `lead`.
194    asc: bool,
195    project: Box<dyn Fn(Option<&I>, &mut O)>,
196    output_pair_factory: &'static dyn Factory<DynPair<I, O>>,
197    output_val_factory: &'static dyn Factory<O>,
198    /// List of keys that must be re-evaluated, computed during
199    /// the forward pass of the algorithm.
200    affected_keys: Box<DynVec<I>>,
201    /// Keys encountered during the backward pass of the algorithm.
202    ///
203    /// See `struct EncounteredKey`.
204    encountered_keys: Vec<EncounteredKey>,
205    /// Index of the next key from `affected_keys` we expect to
206    /// encounter.
207    next_key: isize,
208    /// Number of steps the input cursor took after encountering
209    /// the previous key from `affected_keys`.
210    offset_from_prev: ZWeight,
211    /// For the purpose of computing `lag`, we iterate over a key with weight
212    /// `w` as if it occurred `w` times in the trace.  This field stores the
213    /// remaining number of steps (since it is not tracked by the cursor).
214    remaining_weight: ZWeight,
215    /// Key comparison function.  Set to `cmp` for ascending
216    /// order and the reverse of `cmp` for descending order.
217    key_cmp: KCF,
218    output_pair: Box<DynPair<I, O>>,
219    _phantom: PhantomData<fn(&I, &O)>,
220}
221
222impl<I, O, KCF> Lag<I, O, KCF>
223where
224    I: DataTrait + ?Sized,
225    O: DataTrait + ?Sized,
226    KCF: Fn(&I, &I) -> Ordering + 'static,
227{
228    #[allow(clippy::too_many_arguments)]
229    fn new(
230        output_pair_factory: &'static dyn Factory<DynPair<I, O>>,
231        keys_factory: &'static dyn Factory<DynVec<I>>,
232        output_val_factory: &'static dyn Factory<O>,
233        lag: usize,
234        asc: bool,
235        project: Box<dyn Fn(Option<&I>, &mut O)>,
236        key_cmp: KCF,
237    ) -> Self {
238        Self {
239            name: format!("{}({lag})", if asc { "lag" } else { "lead" }),
240            output_pair_factory,
241            output_val_factory,
242            lag,
243            asc,
244            project,
245            affected_keys: keys_factory.default_box(),
246            encountered_keys: Vec::new(),
247            next_key: 0,
248            offset_from_prev: 0,
249            remaining_weight: 0,
250            key_cmp,
251            output_pair: output_pair_factory.default_box(),
252            _phantom: PhantomData,
253        }
254    }
255
256    /// Add `key` to `self.affected_keys`.
257    fn record_affected_key(&mut self, key: &I) -> usize {
258        let idx = self.affected_keys.len();
259        self.affected_keys.push_ref(key);
260        self.encountered_keys.push(EncounteredKey::new());
261        idx
262    }
263
264    /// Retract all entries from the output trace whose first element is
265    /// equal to `cursor.key().fst()`; return total weight of removed records.
266    fn retract_key(
267        &mut self,
268        cursor: &mut dyn ZCursor<DynPair<I, O>, DynUnit, ()>,
269        output_cb: &mut dyn FnMut(&mut DynPair<I, O>, &mut DynZWeight),
270    ) -> usize {
271        debug_assert!(cursor.key_valid());
272
273        let mut result = 0;
274
275        let idx = self.record_affected_key(cursor.key().fst());
276
277        while cursor.key_valid() && cursor.key().fst() == self.affected_keys.index(idx) {
278            let weight = **cursor.weight();
279            // The output trace should not contain negative weights by construction.
280            debug_assert!(weight > 0);
281            cursor.key().clone_to(self.output_pair.as_mut());
282            //println!("retract: {:?} with weight {weight}", &self.output_pair);
283            output_cb(self.output_pair.as_mut(), weight.neg().erase_mut());
284            result += weight as usize;
285            cursor.step_key();
286        }
287        debug_assert!(!cursor.key_valid() || !cursor.weight().is_zero());
288
289        result
290    }
291
292    /// Move input `cursor` `steps` steps back.  Keep track of keys in the
293    /// `self.affected_keys` array encountered along the way, update their
294    /// `offset` and `weight` fields in `self.encountered_keys`.
295    fn step_key_reverse_n(&mut self, cursor: &mut dyn ZCursor<I, DynUnit, ()>, mut steps: usize) {
296        // println!(
297        //     "step_key_reverse_n {steps} (offset_from_prev = {}, remaining_weight = {})",
298        //     self.offset_from_prev, self.remaining_weight
299        // );
300        while steps > 0 && cursor.key_valid() {
301            steps -= 1;
302            self.offset_from_prev += 1;
303
304            //println!("remaining_weight: {}", self.remaining_weight);
305            if self.remaining_weight > 0 {
306                self.remaining_weight -= 1;
307            }
308
309            if self.remaining_weight == 0 {
310                step_key_reverse_skip_non_positive(cursor);
311                if cursor.key_valid() {
312                    self.remaining_weight = **cursor.weight();
313                    // println!(
314                    //     "key valid, key: {:?}, weight: {}",
315                    //     cursor.key(),
316                    //     self.remaining_weight
317                    // );
318                }
319
320                //println!("next_key: {}", self.next_key);
321
322                while self.next_key >= 0
323                    && (!cursor.key_valid()
324                        || (self.key_cmp)(
325                            cursor.key(),
326                            &self.affected_keys[self.next_key as usize],
327                        ) == Ordering::Less)
328                {
329                    //println!("skipped {:?}", &self.affected_keys[self.next_key as usize]);
330                    self.encountered_keys[self.next_key as usize].offset = Some(None);
331                    self.next_key -= 1;
332                }
333
334                if cursor.key_valid()
335                    && self.next_key >= 0
336                    && cursor.key() == &self.affected_keys[self.next_key as usize]
337                {
338                    // println!(
339                    //     "encountered {:?} at offset {} with weight {}",
340                    //     &self.affected_keys[self.next_key as usize],
341                    //     self.offset_from_prev,
342                    //     self.remaining_weight
343                    // );
344                    debug_assert!(self.offset_from_prev >= 0);
345
346                    self.encountered_keys[self.next_key as usize].offset =
347                        Some(Some(self.offset_from_prev as usize));
348                    self.offset_from_prev = -self.remaining_weight;
349                    self.encountered_keys[self.next_key as usize].weight = self.remaining_weight;
350                    self.next_key -= 1;
351                }
352            }
353        }
354    }
355
356    /// Forward pass: compute keys that require updates.  Retract them from
357    /// output trace and record the keys in `self.affected_keys`, so we can replace
358    /// them with new values during the backward pass.
359    ///
360    /// # Example
361    ///
362    /// Consider `lag(3)` operator and assume that the input collection
363    /// contains values `{1, 2, 3, 4, 6, 7, 8, 9, 10}`, all with weight 1,
364    /// and the delta contains keys `{0 => 1, 5 => 1}`.  The set
365    /// of affected keys includes all keys in delta and for each key in
366    /// delta `3` following keys in the output trace: `{0, 1, 2, 3, 5, 6, 7, 8}`.
367    ///
368    /// Consider delta equal to `{ 0 => 1, 6 => -1 }`?  The retraction of `6` affects
369    /// the next three keys, so the set of affected keys is:
370    /// `{0, 1, 2, 3, 6, 7, 8, 9}`.
371    fn compute_retractions(
372        &mut self,
373        input_delta: &mut dyn ZCursor<I, DynUnit, ()>,
374        output_trace: &mut dyn ZCursor<DynPair<I, O>, DynUnit, ()>,
375        output_cb: &mut dyn FnMut(&mut DynPair<I, O>, &mut DynZWeight),
376    ) {
377        // println!("compute retractions");
378        self.affected_keys.clear();
379        self.encountered_keys.clear();
380
381        let mut lag: usize = 0;
382
383        debug_assert!(!output_trace.key_valid() || !output_trace.weight().is_zero());
384
385        while input_delta.key_valid() && output_trace.key_valid() {
386            // - `input_delta` points to the _next_ delta key to process.
387            // - `lag` is the number of remaining retractions to perform to capture the effect
388            //   of the last processed delta.
389
390            match (self.key_cmp)(output_trace.key().fst(), input_delta.key()) {
391                Ordering::Less if lag > 0 => {
392                    // We have applied the previous delta and still have some retractions to
393                    // perform. Note that this will retract all values for this key in
394                    // `output_trace`, regardless of the value of `lag`.  The reason is that
395                    // the ordering of `(I, O)` pairs in the trace is determined by the ordering
396                    // of type `O` and not the order in which the corresponding entry that `O`
397                    // was projected from appeared in the input trace, so we don't have a way
398                    // to retract precisely the affected values only.
399                    let retractions = self.retract_key(output_trace, output_cb);
400                    lag = lag.saturating_sub(retractions);
401                }
402                Ordering::Less => {
403                    // We have processed the previous delta and all required retractions;
404                    // seek to the next key to process.
405                    let delta_key = input_delta.key();
406
407                    output_trace.seek_key_with(&|key| {
408                        (self.key_cmp)(key.fst(), delta_key) != Ordering::Less
409                    });
410                    debug_assert!(!output_trace.key_valid() || !output_trace.weight().is_zero());
411                }
412                Ordering::Equal => {
413                    // Delta modifies current key. Retract all existing values for this key
414                    // from the output trace.
415                    self.retract_key(output_trace, output_cb);
416                    lag = self.lag;
417                    input_delta.step_key();
418                }
419                Ordering::Greater => {
420                    // Record new key.
421                    self.record_affected_key(input_delta.key());
422                    lag = self.lag;
423                    input_delta.step_key();
424                }
425            }
426        }
427
428        // Finish processing remaining retractions.
429        while output_trace.key_valid() && lag > 0 {
430            let retractions = self.retract_key(output_trace, output_cb);
431            lag = lag.saturating_sub(retractions);
432        }
433
434        // Record remaining keys in `input_delta`.
435        while input_delta.key_valid() {
436            self.record_affected_key(input_delta.key());
437            input_delta.step_key();
438        }
439
440        // println!("affected_keys: {:?}", &self.affected_keys);
441    }
442
443    /// Backward pass: compute updated values for all keys in
444    /// `self.retractions`.
445    fn compute_updates<CB>(
446        &mut self,
447        input_cursor: &mut dyn ZCursor<I, DynUnit, ()>,
448        mut output_cb: CB,
449    ) where
450        CB: FnMut(&mut DynPair<I, O>, &mut DynZWeight),
451    {
452        //println!("compute updates");
453
454        input_cursor.fast_forward_keys();
455        skip_non_positive_reverse(input_cursor);
456        // println!(
457        //     "current key after fast_forward: {:?}",
458        //     input_cursor.get_key()
459        // );
460
461        // Index of the current key in the `affected_keys` array for which we are
462        // computing update.  We traverse the array backward, starting from the
463        // last element.
464        let mut current = self.affected_keys.len() as isize - 1;
465
466        // The first key in the `affected_keys` array that hasn't been observed by the
467        // cursor yet.  Once the key has been observed, it is assigned a number equal
468        // to its distance from the previous key or `self.lag` if the
469        // previous key is more than `self.lag` steps behind.
470        self.next_key = current;
471        self.offset_from_prev = 0;
472        if input_cursor.key_valid() {
473            self.remaining_weight = **input_cursor.weight();
474        }
475
476        let mut new_val = self.output_val_factory.default_box();
477        let mut output_pair = self.output_pair_factory.default_box();
478
479        while current >= 0 {
480            // println!(
481            //     "compute_updates: computing update for affected key at index {current}, next_key: {}",
482            //     self.next_key
483            // );
484
485            match self.encountered_keys[current as usize].offset {
486                Some(Some(offset)) => {
487                    // println!(
488                    //     "key: {:?} was encountered at offset {offset} with weight {}",
489                    //     &self.affected_keys.index(current as usize),
490                    //     self.encountered_keys[current as usize].weight
491                    // );
492
493                    // The key has been observed by the cursor and is known to be
494                    // exactly `offset` steps ahead of the previous key.
495                    // Move the cursor `offset` steps to point to the delayed record
496                    // in the trace.
497
498                    // update offsets in keys we encounter along the way.
499                    self.step_key_reverse_n(input_cursor, offset);
500
501                    // Output insertion.
502                    let weight = self.encountered_keys[current as usize].weight;
503                    for _i in 0..weight {
504                        (self.project)(input_cursor.get_key(), &mut new_val);
505                        output_pair.from_refs(&self.affected_keys[current as usize], &new_val);
506                        //println!("output: {:?}", &output_pair);
507                        output_cb(output_pair.as_mut(), 1.erase_mut());
508                        self.step_key_reverse_n(input_cursor, 1);
509                    }
510                    current -= 1;
511                }
512                Some(None) => {
513                    // println!(
514                    //     "key: {:?}, offset: Some(None)",
515                    //     &self.affected_keys.index(current as usize)
516                    // );
517                    // Key does not occur in the input trace.
518                    current -= 1;
519                }
520                None => {
521                    // println!(
522                    //     "key: {:?}, offset: None",
523                    //     &self.affected_keys.index(current as usize)
524                    // );
525                    // Key is ahead of the current location of the cursor.
526                    // Seek to the key.
527                    input_cursor.seek_key_reverse(&self.affected_keys[current as usize]);
528
529                    // We may have skipped over `current` and potentially multiple other keys.
530                    // All skipped keys are not in the trace.
531                    while !input_cursor.key_valid()
532                        || (self.key_cmp)(input_cursor.key(), &self.affected_keys[current as usize])
533                            == Ordering::Less
534                    {
535                        current -= 1;
536                        if current < 0 {
537                            break;
538                        }
539                    }
540
541                    if current >= 0 && &self.affected_keys[current as usize] == input_cursor.key() {
542                        // The cursor points to current key.  Move it `lag` steps to point to
543                        // the matching delayed record.
544                        self.encountered_keys[current as usize].offset = Some(Some(self.lag));
545                        self.encountered_keys[current as usize].weight = **input_cursor.weight();
546                        self.next_key = current - 1;
547                        self.remaining_weight = **input_cursor.weight();
548                        self.offset_from_prev = -self.remaining_weight;
549                    } else {
550                        // New current key is again ahead of the cursor; we'll seek to it at the
551                        // next iteration.
552                        self.next_key = current;
553                    }
554                }
555            }
556        }
557
558        // We want to reuse the allocation across multiple calls, but cap it
559        // at `MAX_RETRACTIONS_CAPACITY`.
560        if self.encountered_keys.capacity() >= MAX_RETRACTIONS_CAPACITY {
561            self.encountered_keys = Vec::new();
562            self.affected_keys.clear();
563            self.affected_keys.shrink_to_fit();
564        }
565    }
566}
567
568/// Per-key state created by the `lag` operator during forward pass.
569///
570/// * Records retractions to be applied to the output collection.
571type AffectedKeys<K> = DynVec<K>;
572
573/// Per-key state created by the `lag` operator during backward pass.
574///
575/// The cursor generally stays `lag` steps ahead of the current
576/// key for which we are computing the value of the lag function.
577/// As it moves forward (actually, backward, since this is the
578/// backward pass), it encounters keys that will be evaluated next.
579/// Since the cursor only moves in one direction, we won't get a
580/// chance to visit those keys again, so we store all info we need
581/// about the key in this struct, including
582///
583/// * The weight of each affected key.
584/// * Offset from the previous encountered affected key. Determines how far to
585///   move the cursor to reach the matching delayed record.
586#[derive(Debug)]
587struct EncounteredKey {
588    /// The weight of the key in the input collection.
589    weight: ZWeight,
590    /// Offset from the previous key in the `affected_keys` array that occurs
591    /// in the input trace.
592    ///
593    /// * `None` - key hasn't been encountered yet
594    /// * `Some(None)` - key does not occur in the input trace.
595    /// * `Some(Some(n))` - key is `n` steps away from the previous key from
596    ///   `affected_keys` that is present in the input trace.  Specifically,
597    ///   it is the sum of all positive weights between the previous key
598    ///   (exclusive) and the current key (exclusive).
599    offset: Option<Option<usize>>,
600}
601
602impl EncounteredKey {
603    fn new() -> Self {
604        Self {
605            weight: HasZero::zero(),
606            offset: None,
607        }
608    }
609}
610
611impl<I, O, KCF> GroupTransformer<I, DynPair<I, O>> for Lag<I, O, KCF>
612where
613    I: DataTrait + ?Sized,
614    O: DataTrait + ?Sized,
615    KCF: Fn(&I, &I) -> Ordering + 'static,
616{
617    fn name(&self) -> &str {
618        self.name.as_str()
619    }
620
621    fn monotonicity(&self) -> Monotonicity {
622        Monotonicity::Unordered
623    }
624
625    fn transform(
626        &mut self,
627        input_delta: &mut dyn ZCursor<I, DynUnit, ()>,
628        input_trace: &mut dyn ZCursor<I, DynUnit, ()>,
629        output_trace: &mut dyn ZCursor<DynPair<I, O>, DynUnit, ()>,
630        output_cb: &mut dyn FnMut(&mut DynPair<I, O>, &mut DynZWeight),
631    ) {
632        // {
633        //     println!("input_delta:");
634        //     while input_delta.key_valid() {
635        //         let w = **input_delta.weight();
636        //         println!("    {:?} -> {:?}", input_delta.key(), w);
637        //         input_delta.step_key();
638        //     }
639        //     input_delta.rewind_keys();
640        // }
641        // {
642        //     println!("input_trace:");
643        //     while input_trace.key_valid() {
644        //         let w = **input_trace.weight();
645        //         println!("    {:?} -> {:?}", input_trace.key(), w);
646        //         input_trace.step_key();
647        //     }
648        //     input_trace.rewind_keys();
649        // }
650        // {
651        //     println!("output_trace:");
652        //     while output_trace.key_valid() {
653        //         let w = **output_trace.weight();
654        //         println!("    {:?} -> {:?}", output_trace.key(), w);
655        //         output_trace.step_key();
656        //     }
657        //     output_trace.rewind_keys();
658        // }
659
660        if self.asc {
661            self.compute_retractions(input_delta, output_trace, output_cb);
662            self.compute_updates(&mut CursorPair::new(input_delta, input_trace), output_cb);
663        } else {
664            self.compute_retractions(
665                &mut ReverseKeyCursor::new(input_delta),
666                &mut ReverseKeyCursor::new(output_trace),
667                output_cb,
668            );
669            self.compute_updates(
670                &mut ReverseKeyCursor::new(&mut CursorPair::new(input_delta, input_trace)),
671                output_cb,
672            );
673        }
674    }
675}
676
677fn step_key_reverse_skip_non_positive<I>(cursor: &mut dyn ZCursor<I, DynUnit, ()>)
678where
679    I: DataTrait + ?Sized,
680{
681    cursor.step_key_reverse();
682    skip_non_positive_reverse(cursor)
683}
684
685fn skip_non_positive_reverse<I>(cursor: &mut dyn ZCursor<I, DynUnit, ()>)
686where
687    I: DataTrait + ?Sized,
688{
689    while cursor.key_valid() && cursor.weight().le0() {
690        cursor.step_key_reverse();
691    }
692}