dbsp/operator/dynamic/group/lag.rs
1use super::{GroupTransformer, Monotonicity};
2use crate::Circuit;
3use crate::algebra::{OrdIndexedZSetFactories, ZRingValue};
4use crate::operator::dynamic::filter_map::DynFilterMap;
5use crate::{
6 DBData, DynZWeight, RootCircuit, Stream, ZWeight,
7 algebra::{HasZero, IndexedZSet, OrdIndexedZSet, ZCursor},
8 dynamic::{
9 ClonableTrait, DataTrait, DynData, DynPair, DynUnit, DynVec, Erase, Factory, LeanVec,
10 WithFactory,
11 },
12 operator::dynamic::MonoIndexedZSet,
13 trace::{
14 BatchReaderFactories,
15 cursor::{CursorPair, ReverseKeyCursor},
16 },
17 utils::Tup2,
18};
19use std::{cmp::Ordering, marker::PhantomData, ops::Neg};
20
21const MAX_RETRACTIONS_CAPACITY: usize = 100_000usize;
22
23pub struct LagFactories<B: IndexedZSet, OV: DataTrait + ?Sized> {
24 input_factories: B::Factories,
25 output_factories: OrdIndexedZSetFactories<B::Key, DynPair<B::Val, OV>>,
26 keys_factory: &'static dyn Factory<AffectedKeys<B::Val>>,
27 output_val_factory: &'static dyn Factory<OV>,
28}
29
30impl<B, OV> LagFactories<B, OV>
31where
32 B: IndexedZSet,
33 OV: DataTrait + ?Sized,
34{
35 pub fn new<KType, VType, OVType>() -> Self
36 where
37 KType: DBData + Erase<B::Key>,
38 VType: DBData + Erase<B::Val>,
39 OVType: DBData + Erase<OV>,
40 {
41 Self {
42 input_factories: BatchReaderFactories::new::<KType, VType, ZWeight>(),
43 output_factories: BatchReaderFactories::new::<KType, Tup2<VType, OVType>, ZWeight>(),
44 keys_factory: WithFactory::<LeanVec<VType>>::FACTORY,
45 output_val_factory: WithFactory::<OVType>::FACTORY,
46 }
47 }
48}
49
50pub struct LagCustomOrdFactories<
51 B: IndexedZSet,
52 V2: DataTrait + ?Sized,
53 VL: DataTrait + ?Sized,
54 OV: DataTrait + ?Sized,
55> {
56 lag_factories: LagFactories<OrdIndexedZSet<B::Key, V2>, VL>,
57 output_factories: OrdIndexedZSetFactories<B::Key, OV>,
58}
59
60impl<B, V2, VL, OV> LagCustomOrdFactories<B, V2, VL, OV>
61where
62 B: IndexedZSet,
63 V2: DataTrait + ?Sized,
64 VL: DataTrait + ?Sized,
65 OV: DataTrait + ?Sized,
66{
67 pub fn new<KType, VType, V2Type, VLType, OVType>() -> Self
68 where
69 KType: DBData + Erase<B::Key>,
70 VType: DBData + Erase<B::Val>,
71 V2Type: DBData + Erase<V2>,
72 VLType: DBData + Erase<VL>,
73 OVType: DBData + Erase<OV>,
74 {
75 Self {
76 lag_factories: LagFactories::new::<KType, V2Type, VLType>(),
77 output_factories: BatchReaderFactories::new::<KType, OVType, ZWeight>(),
78 }
79 }
80}
81
82impl<B> Stream<RootCircuit, B>
83where
84 B: IndexedZSet + Send,
85{
86 /// See [`Stream::lag`].
87 #[allow(clippy::type_complexity)]
88 pub fn dyn_lag<OV>(
89 &self,
90 persistent_id: Option<&str>,
91 factories: &LagFactories<B, OV>,
92 offset: isize,
93 project: Box<dyn Fn(Option<&B::Val>, &mut OV)>,
94 ) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, DynPair<B::Val, OV>>>
95 where
96 OV: DataTrait + ?Sized,
97 {
98 self.dyn_group_transform(
99 persistent_id,
100 &factories.input_factories,
101 &factories.output_factories,
102 Box::new(Lag::new(
103 factories.output_factories.val_factory(),
104 factories.keys_factory,
105 factories.output_val_factory,
106 offset.unsigned_abs(),
107 offset > 0,
108 project,
109 if offset > 0 {
110 |k1: &B::Val, k2: &B::Val| k1.cmp(k2)
111 } else {
112 |k1: &B::Val, k2: &B::Val| k2.cmp(k1)
113 },
114 )),
115 )
116 }
117}
118
119impl Stream<RootCircuit, MonoIndexedZSet> {
120 pub fn dyn_lag_custom_order_mono(
121 &self,
122 persistent_id: Option<&str>,
123 factories: &LagCustomOrdFactories<MonoIndexedZSet, DynData, DynData, DynData>,
124 offset: isize,
125 encode: Box<dyn Fn(&DynData, &mut DynData)>,
126 project: Box<dyn Fn(Option<&DynData>, &mut DynData)>,
127 decode: Box<dyn Fn(&DynData, &DynData, &mut DynData)>,
128 ) -> Stream<RootCircuit, MonoIndexedZSet> {
129 self.dyn_lag_custom_order(persistent_id, factories, offset, encode, project, decode)
130 }
131}
132
133impl<B, K, V> Stream<RootCircuit, B>
134where
135 B: IndexedZSet<Key = K, Val = V> + Send,
136 K: DataTrait + ?Sized,
137 V: DataTrait + ?Sized,
138{
139 /// See [`Stream::lag_custom_order`].
140 pub fn dyn_lag_custom_order<V2, VL, OV>(
141 &self,
142 persistent_id: Option<&str>,
143 factories: &LagCustomOrdFactories<B, V2, VL, OV>,
144 offset: isize,
145 encode: Box<dyn Fn(&V, &mut V2)>,
146 project: Box<dyn Fn(Option<&V2>, &mut VL)>,
147 decode: Box<dyn Fn(&V2, &VL, &mut OV)>,
148 ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
149 where
150 V2: DataTrait + ?Sized,
151 VL: DataTrait + ?Sized,
152 OV: DataTrait + ?Sized,
153 B: for<'a> DynFilterMap<DynItemRef<'a> = (&'a K, &'a V)>,
154 {
155 let name = if offset > 0 {
156 format!("lag_custom_order_{offset}")
157 } else {
158 format!("lead_custom_order_{}", -offset)
159 };
160
161 self.circuit().region(&name, || {
162 self.dyn_map_index(
163 &factories.lag_factories.input_factories,
164 Box::new(move |(k, v), kv| {
165 let (out_k, out_v) = kv.split_mut();
166 k.clone_to(out_k);
167 encode(v, out_v);
168 }),
169 )
170 .set_persistent_id(
171 persistent_id
172 .map(|name| format!("{name}-ordered"))
173 .as_deref(),
174 )
175 .dyn_lag(persistent_id, &factories.lag_factories, offset, project)
176 .dyn_map_index(
177 &factories.output_factories,
178 Box::new(move |(k, v), kv| {
179 let (out_k, out_v) = kv.split_mut();
180 let (v1, v2) = v.split();
181 k.clone_to(out_k);
182 decode(v1, v2, out_v);
183 }),
184 )
185 })
186 }
187}
188
189/// Implement both `lag` and `lead` operators.
190struct Lag<I: DataTrait + ?Sized, O: DataTrait + ?Sized, KCF> {
191 name: String,
192 lag: usize,
193 /// `true` for `lag`, `false` for `lead`.
194 asc: bool,
195 project: Box<dyn Fn(Option<&I>, &mut O)>,
196 output_pair_factory: &'static dyn Factory<DynPair<I, O>>,
197 output_val_factory: &'static dyn Factory<O>,
198 /// List of keys that must be re-evaluated, computed during
199 /// the forward pass of the algorithm.
200 affected_keys: Box<DynVec<I>>,
201 /// Keys encountered during the backward pass of the algorithm.
202 ///
203 /// See `struct EncounteredKey`.
204 encountered_keys: Vec<EncounteredKey>,
205 /// Index of the next key from `affected_keys` we expect to
206 /// encounter.
207 next_key: isize,
208 /// Number of steps the input cursor took after encountering
209 /// the previous key from `affected_keys`.
210 offset_from_prev: ZWeight,
211 /// For the purpose of computing `lag`, we iterate over a key with weight
212 /// `w` as if it occurred `w` times in the trace. This field stores the
213 /// remaining number of steps (since it is not tracked by the cursor).
214 remaining_weight: ZWeight,
215 /// Key comparison function. Set to `cmp` for ascending
216 /// order and the reverse of `cmp` for descending order.
217 key_cmp: KCF,
218 output_pair: Box<DynPair<I, O>>,
219 _phantom: PhantomData<fn(&I, &O)>,
220}
221
222impl<I, O, KCF> Lag<I, O, KCF>
223where
224 I: DataTrait + ?Sized,
225 O: DataTrait + ?Sized,
226 KCF: Fn(&I, &I) -> Ordering + 'static,
227{
228 #[allow(clippy::too_many_arguments)]
229 fn new(
230 output_pair_factory: &'static dyn Factory<DynPair<I, O>>,
231 keys_factory: &'static dyn Factory<DynVec<I>>,
232 output_val_factory: &'static dyn Factory<O>,
233 lag: usize,
234 asc: bool,
235 project: Box<dyn Fn(Option<&I>, &mut O)>,
236 key_cmp: KCF,
237 ) -> Self {
238 Self {
239 name: format!("{}({lag})", if asc { "lag" } else { "lead" }),
240 output_pair_factory,
241 output_val_factory,
242 lag,
243 asc,
244 project,
245 affected_keys: keys_factory.default_box(),
246 encountered_keys: Vec::new(),
247 next_key: 0,
248 offset_from_prev: 0,
249 remaining_weight: 0,
250 key_cmp,
251 output_pair: output_pair_factory.default_box(),
252 _phantom: PhantomData,
253 }
254 }
255
256 /// Add `key` to `self.affected_keys`.
257 fn record_affected_key(&mut self, key: &I) -> usize {
258 let idx = self.affected_keys.len();
259 self.affected_keys.push_ref(key);
260 self.encountered_keys.push(EncounteredKey::new());
261 idx
262 }
263
264 /// Retract all entries from the output trace whose first element is
265 /// equal to `cursor.key().fst()`; return total weight of removed records.
266 fn retract_key(
267 &mut self,
268 cursor: &mut dyn ZCursor<DynPair<I, O>, DynUnit, ()>,
269 output_cb: &mut dyn FnMut(&mut DynPair<I, O>, &mut DynZWeight),
270 ) -> usize {
271 debug_assert!(cursor.key_valid());
272
273 let mut result = 0;
274
275 let idx = self.record_affected_key(cursor.key().fst());
276
277 while cursor.key_valid() && cursor.key().fst() == self.affected_keys.index(idx) {
278 let weight = **cursor.weight();
279 // The output trace should not contain negative weights by construction.
280 debug_assert!(weight > 0);
281 cursor.key().clone_to(self.output_pair.as_mut());
282 //println!("retract: {:?} with weight {weight}", &self.output_pair);
283 output_cb(self.output_pair.as_mut(), weight.neg().erase_mut());
284 result += weight as usize;
285 cursor.step_key();
286 }
287 debug_assert!(!cursor.key_valid() || !cursor.weight().is_zero());
288
289 result
290 }
291
292 /// Move input `cursor` `steps` steps back. Keep track of keys in the
293 /// `self.affected_keys` array encountered along the way, update their
294 /// `offset` and `weight` fields in `self.encountered_keys`.
295 fn step_key_reverse_n(&mut self, cursor: &mut dyn ZCursor<I, DynUnit, ()>, mut steps: usize) {
296 // println!(
297 // "step_key_reverse_n {steps} (offset_from_prev = {}, remaining_weight = {})",
298 // self.offset_from_prev, self.remaining_weight
299 // );
300 while steps > 0 && cursor.key_valid() {
301 steps -= 1;
302 self.offset_from_prev += 1;
303
304 //println!("remaining_weight: {}", self.remaining_weight);
305 if self.remaining_weight > 0 {
306 self.remaining_weight -= 1;
307 }
308
309 if self.remaining_weight == 0 {
310 step_key_reverse_skip_non_positive(cursor);
311 if cursor.key_valid() {
312 self.remaining_weight = **cursor.weight();
313 // println!(
314 // "key valid, key: {:?}, weight: {}",
315 // cursor.key(),
316 // self.remaining_weight
317 // );
318 }
319
320 //println!("next_key: {}", self.next_key);
321
322 while self.next_key >= 0
323 && (!cursor.key_valid()
324 || (self.key_cmp)(
325 cursor.key(),
326 &self.affected_keys[self.next_key as usize],
327 ) == Ordering::Less)
328 {
329 //println!("skipped {:?}", &self.affected_keys[self.next_key as usize]);
330 self.encountered_keys[self.next_key as usize].offset = Some(None);
331 self.next_key -= 1;
332 }
333
334 if cursor.key_valid()
335 && self.next_key >= 0
336 && cursor.key() == &self.affected_keys[self.next_key as usize]
337 {
338 // println!(
339 // "encountered {:?} at offset {} with weight {}",
340 // &self.affected_keys[self.next_key as usize],
341 // self.offset_from_prev,
342 // self.remaining_weight
343 // );
344 debug_assert!(self.offset_from_prev >= 0);
345
346 self.encountered_keys[self.next_key as usize].offset =
347 Some(Some(self.offset_from_prev as usize));
348 self.offset_from_prev = -self.remaining_weight;
349 self.encountered_keys[self.next_key as usize].weight = self.remaining_weight;
350 self.next_key -= 1;
351 }
352 }
353 }
354 }
355
356 /// Forward pass: compute keys that require updates. Retract them from
357 /// output trace and record the keys in `self.affected_keys`, so we can replace
358 /// them with new values during the backward pass.
359 ///
360 /// # Example
361 ///
362 /// Consider `lag(3)` operator and assume that the input collection
363 /// contains values `{1, 2, 3, 4, 6, 7, 8, 9, 10}`, all with weight 1,
364 /// and the delta contains keys `{0 => 1, 5 => 1}`. The set
365 /// of affected keys includes all keys in delta and for each key in
366 /// delta `3` following keys in the output trace: `{0, 1, 2, 3, 5, 6, 7, 8}`.
367 ///
368 /// Consider delta equal to `{ 0 => 1, 6 => -1 }`? The retraction of `6` affects
369 /// the next three keys, so the set of affected keys is:
370 /// `{0, 1, 2, 3, 6, 7, 8, 9}`.
371 fn compute_retractions(
372 &mut self,
373 input_delta: &mut dyn ZCursor<I, DynUnit, ()>,
374 output_trace: &mut dyn ZCursor<DynPair<I, O>, DynUnit, ()>,
375 output_cb: &mut dyn FnMut(&mut DynPair<I, O>, &mut DynZWeight),
376 ) {
377 // println!("compute retractions");
378 self.affected_keys.clear();
379 self.encountered_keys.clear();
380
381 let mut lag: usize = 0;
382
383 debug_assert!(!output_trace.key_valid() || !output_trace.weight().is_zero());
384
385 while input_delta.key_valid() && output_trace.key_valid() {
386 // - `input_delta` points to the _next_ delta key to process.
387 // - `lag` is the number of remaining retractions to perform to capture the effect
388 // of the last processed delta.
389
390 match (self.key_cmp)(output_trace.key().fst(), input_delta.key()) {
391 Ordering::Less if lag > 0 => {
392 // We have applied the previous delta and still have some retractions to
393 // perform. Note that this will retract all values for this key in
394 // `output_trace`, regardless of the value of `lag`. The reason is that
395 // the ordering of `(I, O)` pairs in the trace is determined by the ordering
396 // of type `O` and not the order in which the corresponding entry that `O`
397 // was projected from appeared in the input trace, so we don't have a way
398 // to retract precisely the affected values only.
399 let retractions = self.retract_key(output_trace, output_cb);
400 lag = lag.saturating_sub(retractions);
401 }
402 Ordering::Less => {
403 // We have processed the previous delta and all required retractions;
404 // seek to the next key to process.
405 let delta_key = input_delta.key();
406
407 output_trace.seek_key_with(&|key| {
408 (self.key_cmp)(key.fst(), delta_key) != Ordering::Less
409 });
410 debug_assert!(!output_trace.key_valid() || !output_trace.weight().is_zero());
411 }
412 Ordering::Equal => {
413 // Delta modifies current key. Retract all existing values for this key
414 // from the output trace.
415 self.retract_key(output_trace, output_cb);
416 lag = self.lag;
417 input_delta.step_key();
418 }
419 Ordering::Greater => {
420 // Record new key.
421 self.record_affected_key(input_delta.key());
422 lag = self.lag;
423 input_delta.step_key();
424 }
425 }
426 }
427
428 // Finish processing remaining retractions.
429 while output_trace.key_valid() && lag > 0 {
430 let retractions = self.retract_key(output_trace, output_cb);
431 lag = lag.saturating_sub(retractions);
432 }
433
434 // Record remaining keys in `input_delta`.
435 while input_delta.key_valid() {
436 self.record_affected_key(input_delta.key());
437 input_delta.step_key();
438 }
439
440 // println!("affected_keys: {:?}", &self.affected_keys);
441 }
442
443 /// Backward pass: compute updated values for all keys in
444 /// `self.retractions`.
445 fn compute_updates<CB>(
446 &mut self,
447 input_cursor: &mut dyn ZCursor<I, DynUnit, ()>,
448 mut output_cb: CB,
449 ) where
450 CB: FnMut(&mut DynPair<I, O>, &mut DynZWeight),
451 {
452 //println!("compute updates");
453
454 input_cursor.fast_forward_keys();
455 skip_non_positive_reverse(input_cursor);
456 // println!(
457 // "current key after fast_forward: {:?}",
458 // input_cursor.get_key()
459 // );
460
461 // Index of the current key in the `affected_keys` array for which we are
462 // computing update. We traverse the array backward, starting from the
463 // last element.
464 let mut current = self.affected_keys.len() as isize - 1;
465
466 // The first key in the `affected_keys` array that hasn't been observed by the
467 // cursor yet. Once the key has been observed, it is assigned a number equal
468 // to its distance from the previous key or `self.lag` if the
469 // previous key is more than `self.lag` steps behind.
470 self.next_key = current;
471 self.offset_from_prev = 0;
472 if input_cursor.key_valid() {
473 self.remaining_weight = **input_cursor.weight();
474 }
475
476 let mut new_val = self.output_val_factory.default_box();
477 let mut output_pair = self.output_pair_factory.default_box();
478
479 while current >= 0 {
480 // println!(
481 // "compute_updates: computing update for affected key at index {current}, next_key: {}",
482 // self.next_key
483 // );
484
485 match self.encountered_keys[current as usize].offset {
486 Some(Some(offset)) => {
487 // println!(
488 // "key: {:?} was encountered at offset {offset} with weight {}",
489 // &self.affected_keys.index(current as usize),
490 // self.encountered_keys[current as usize].weight
491 // );
492
493 // The key has been observed by the cursor and is known to be
494 // exactly `offset` steps ahead of the previous key.
495 // Move the cursor `offset` steps to point to the delayed record
496 // in the trace.
497
498 // update offsets in keys we encounter along the way.
499 self.step_key_reverse_n(input_cursor, offset);
500
501 // Output insertion.
502 let weight = self.encountered_keys[current as usize].weight;
503 for _i in 0..weight {
504 (self.project)(input_cursor.get_key(), &mut new_val);
505 output_pair.from_refs(&self.affected_keys[current as usize], &new_val);
506 //println!("output: {:?}", &output_pair);
507 output_cb(output_pair.as_mut(), 1.erase_mut());
508 self.step_key_reverse_n(input_cursor, 1);
509 }
510 current -= 1;
511 }
512 Some(None) => {
513 // println!(
514 // "key: {:?}, offset: Some(None)",
515 // &self.affected_keys.index(current as usize)
516 // );
517 // Key does not occur in the input trace.
518 current -= 1;
519 }
520 None => {
521 // println!(
522 // "key: {:?}, offset: None",
523 // &self.affected_keys.index(current as usize)
524 // );
525 // Key is ahead of the current location of the cursor.
526 // Seek to the key.
527 input_cursor.seek_key_reverse(&self.affected_keys[current as usize]);
528
529 // We may have skipped over `current` and potentially multiple other keys.
530 // All skipped keys are not in the trace.
531 while !input_cursor.key_valid()
532 || (self.key_cmp)(input_cursor.key(), &self.affected_keys[current as usize])
533 == Ordering::Less
534 {
535 current -= 1;
536 if current < 0 {
537 break;
538 }
539 }
540
541 if current >= 0 && &self.affected_keys[current as usize] == input_cursor.key() {
542 // The cursor points to current key. Move it `lag` steps to point to
543 // the matching delayed record.
544 self.encountered_keys[current as usize].offset = Some(Some(self.lag));
545 self.encountered_keys[current as usize].weight = **input_cursor.weight();
546 self.next_key = current - 1;
547 self.remaining_weight = **input_cursor.weight();
548 self.offset_from_prev = -self.remaining_weight;
549 } else {
550 // New current key is again ahead of the cursor; we'll seek to it at the
551 // next iteration.
552 self.next_key = current;
553 }
554 }
555 }
556 }
557
558 // We want to reuse the allocation across multiple calls, but cap it
559 // at `MAX_RETRACTIONS_CAPACITY`.
560 if self.encountered_keys.capacity() >= MAX_RETRACTIONS_CAPACITY {
561 self.encountered_keys = Vec::new();
562 self.affected_keys.clear();
563 self.affected_keys.shrink_to_fit();
564 }
565 }
566}
567
568/// Per-key state created by the `lag` operator during forward pass.
569///
570/// * Records retractions to be applied to the output collection.
571type AffectedKeys<K> = DynVec<K>;
572
573/// Per-key state created by the `lag` operator during backward pass.
574///
575/// The cursor generally stays `lag` steps ahead of the current
576/// key for which we are computing the value of the lag function.
577/// As it moves forward (actually, backward, since this is the
578/// backward pass), it encounters keys that will be evaluated next.
579/// Since the cursor only moves in one direction, we won't get a
580/// chance to visit those keys again, so we store all info we need
581/// about the key in this struct, including
582///
583/// * The weight of each affected key.
584/// * Offset from the previous encountered affected key. Determines how far to
585/// move the cursor to reach the matching delayed record.
586#[derive(Debug)]
587struct EncounteredKey {
588 /// The weight of the key in the input collection.
589 weight: ZWeight,
590 /// Offset from the previous key in the `affected_keys` array that occurs
591 /// in the input trace.
592 ///
593 /// * `None` - key hasn't been encountered yet
594 /// * `Some(None)` - key does not occur in the input trace.
595 /// * `Some(Some(n))` - key is `n` steps away from the previous key from
596 /// `affected_keys` that is present in the input trace. Specifically,
597 /// it is the sum of all positive weights between the previous key
598 /// (exclusive) and the current key (exclusive).
599 offset: Option<Option<usize>>,
600}
601
602impl EncounteredKey {
603 fn new() -> Self {
604 Self {
605 weight: HasZero::zero(),
606 offset: None,
607 }
608 }
609}
610
611impl<I, O, KCF> GroupTransformer<I, DynPair<I, O>> for Lag<I, O, KCF>
612where
613 I: DataTrait + ?Sized,
614 O: DataTrait + ?Sized,
615 KCF: Fn(&I, &I) -> Ordering + 'static,
616{
617 fn name(&self) -> &str {
618 self.name.as_str()
619 }
620
621 fn monotonicity(&self) -> Monotonicity {
622 Monotonicity::Unordered
623 }
624
625 fn transform(
626 &mut self,
627 input_delta: &mut dyn ZCursor<I, DynUnit, ()>,
628 input_trace: &mut dyn ZCursor<I, DynUnit, ()>,
629 output_trace: &mut dyn ZCursor<DynPair<I, O>, DynUnit, ()>,
630 output_cb: &mut dyn FnMut(&mut DynPair<I, O>, &mut DynZWeight),
631 ) {
632 // {
633 // println!("input_delta:");
634 // while input_delta.key_valid() {
635 // let w = **input_delta.weight();
636 // println!(" {:?} -> {:?}", input_delta.key(), w);
637 // input_delta.step_key();
638 // }
639 // input_delta.rewind_keys();
640 // }
641 // {
642 // println!("input_trace:");
643 // while input_trace.key_valid() {
644 // let w = **input_trace.weight();
645 // println!(" {:?} -> {:?}", input_trace.key(), w);
646 // input_trace.step_key();
647 // }
648 // input_trace.rewind_keys();
649 // }
650 // {
651 // println!("output_trace:");
652 // while output_trace.key_valid() {
653 // let w = **output_trace.weight();
654 // println!(" {:?} -> {:?}", output_trace.key(), w);
655 // output_trace.step_key();
656 // }
657 // output_trace.rewind_keys();
658 // }
659
660 if self.asc {
661 self.compute_retractions(input_delta, output_trace, output_cb);
662 self.compute_updates(&mut CursorPair::new(input_delta, input_trace), output_cb);
663 } else {
664 self.compute_retractions(
665 &mut ReverseKeyCursor::new(input_delta),
666 &mut ReverseKeyCursor::new(output_trace),
667 output_cb,
668 );
669 self.compute_updates(
670 &mut ReverseKeyCursor::new(&mut CursorPair::new(input_delta, input_trace)),
671 output_cb,
672 );
673 }
674 }
675}
676
677fn step_key_reverse_skip_non_positive<I>(cursor: &mut dyn ZCursor<I, DynUnit, ()>)
678where
679 I: DataTrait + ?Sized,
680{
681 cursor.step_key_reverse();
682 skip_non_positive_reverse(cursor)
683}
684
685fn skip_non_positive_reverse<I>(cursor: &mut dyn ZCursor<I, DynUnit, ()>)
686where
687 I: DataTrait + ?Sized,
688{
689 while cursor.key_valid() && cursor.weight().le0() {
690 cursor.step_key_reverse();
691 }
692}