Skip to main content

dbsp/operator/dynamic/
trace.rs

1use crate::circuit::circuit_builder::{StreamId, register_replay_stream};
2use crate::circuit::metadata::{INPUT_RECORDS_COUNT, MEMORY_ALLOCATIONS_COUNT, RETAINMENT_BOUNDS};
3use crate::circuit::splitter_output_chunk_size;
4use crate::dynamic::{Factory, Weight, WeightTrait};
5use crate::operator::require_persistent_id;
6use crate::trace::spine_async::WithSnapshot;
7use crate::trace::{BatchReaderFactories, Builder, GroupFilter, MergeCursor};
8use crate::{
9    Error, Timestamp,
10    circuit::{
11        Circuit, ExportId, ExportStream, FeedbackConnector, GlobalNodeId, OwnershipPreference,
12        Scope, Stream, WithClock,
13        metadata::{
14            ALLOCATED_MEMORY_BYTES, MetaItem, OperatorMeta, SHARED_MEMORY_BYTES,
15            STATE_RECORDS_COUNT, USED_MEMORY_BYTES,
16        },
17        operator_traits::{BinaryOperator, Operator, StrictOperator, StrictUnaryOperator},
18    },
19    circuit_cache_key,
20    dynamic::DataTrait,
21    trace::{Batch, BatchReader, Filter, Spine, SpineSnapshot, Trace},
22};
23use dyn_clone::clone_box;
24use feldera_storage::{FileCommitter, StoragePath};
25use ouroboros::self_referencing;
26use size_of::SizeOf;
27use std::any::TypeId;
28use std::collections::BTreeMap;
29use std::mem::transmute;
30use std::{
31    borrow::Cow,
32    cell::{Ref, RefCell},
33    cmp::Ordering,
34    fmt::Debug,
35    marker::PhantomData,
36    ops::Deref,
37    rc::Rc,
38    sync::Arc,
39};
40
41circuit_cache_key!(TraceId<C, D: BatchReader>(StreamId => Stream<C, D>));
42circuit_cache_key!(BoundsId<D: BatchReader>(StreamId => TraceBounds<<D as BatchReader>::Key, <D as BatchReader>::Val>));
43
44// Trace of a collection delayed by one step.
45circuit_cache_key!(DelayedTraceId<C, D>(StreamId => Stream<C, D>));
46
47/// Lower bound on keys or values in a trace.
48///
49/// Setting the bound to `None` is equivalent to setting it to
50/// `T::min_value()`, i.e., the contents of the trace will never
51/// get truncated.
52///
53/// The writer can update the value of the bound at each clock
54/// cycle.  The bound can only increase monotonically.
55#[repr(transparent)]
56pub struct TraceBound<T: ?Sized>(Rc<RefCell<Option<Box<T>>>>);
57
58impl<T: DataTrait + ?Sized> Clone for TraceBound<T> {
59    fn clone(&self) -> Self {
60        Self(self.0.clone())
61    }
62}
63
64impl<T: Debug + ?Sized> Debug for TraceBound<T> {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        self.0.borrow().fmt(f)
67    }
68}
69
70impl<T: DataTrait + ?Sized> PartialEq for TraceBound<T> {
71    fn eq(&self, other: &Self) -> bool {
72        self.0 == other.0
73    }
74}
75
76impl<T: DataTrait + ?Sized> Eq for TraceBound<T> {}
77
78impl<T: DataTrait + ?Sized> PartialOrd for TraceBound<T> {
79    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
80        Some(self.cmp(other))
81    }
82}
83
84impl<T: DataTrait + ?Sized> Ord for TraceBound<T> {
85    fn cmp(&self, other: &Self) -> Ordering {
86        self.0.cmp(&other.0)
87    }
88}
89
90impl<K: DataTrait + ?Sized> Default for TraceBound<K> {
91    fn default() -> Self {
92        Self(Rc::new(RefCell::new(None)))
93    }
94}
95
96impl<K: DataTrait + ?Sized> TraceBound<K> {
97    pub fn new() -> Self {
98        Default::default()
99    }
100
101    /// Set the new value of the bound.
102    pub fn set(&self, bound: Box<K>) {
103        //debug_assert!(self.0.borrow().as_ref() <= Some(&bound));
104        *self.0.borrow_mut() = Some(bound);
105    }
106
107    /// Get the current value of the bound.
108    pub fn get(&self) -> Ref<'_, Option<Box<K>>> {
109        (*self.0).borrow()
110    }
111}
112
113/// Data structure that tracks key and value retainment policies for a
114/// trace.
115pub struct TraceBounds<K: ?Sized + 'static, V: DataTrait + ?Sized>(
116    Rc<RefCell<TraceBoundsInner<K, V>>>,
117);
118
119impl<K: DataTrait + ?Sized, V: DataTrait + ?Sized> Clone for TraceBounds<K, V> {
120    fn clone(&self) -> Self {
121        Self(self.0.clone())
122    }
123}
124
125impl<K: DataTrait + ?Sized, V: DataTrait + ?Sized> TraceBounds<K, V> {
126    /// Instantiate `TraceBounds` with empty sets of key and value bounds.
127    ///
128    /// The caller must add at least one key and one value bound before
129    /// running the circuit.
130    pub(crate) fn new() -> Self {
131        Self(Rc::new(RefCell::new(TraceBoundsInner {
132            key_predicate: Predicate::Bounds(Vec::new()),
133            unique_key_name: None,
134            val_predicate: GroupPredicate::Bounds(Vec::new()),
135            unique_val_name: None,
136        })))
137    }
138
139    /// Returns `TraceBounds` that prevent any values in the trace from
140    /// being truncated.
141    pub(crate) fn unbounded() -> Self {
142        Self(Rc::new(RefCell::new(TraceBoundsInner {
143            key_predicate: Predicate::Bounds(vec![TraceBound::new()]),
144            unique_key_name: None,
145            val_predicate: GroupPredicate::Bounds(vec![TraceBound::new()]),
146            unique_val_name: None,
147        })))
148    }
149
150    pub(crate) fn add_key_bound(&self, bound: TraceBound<K>) {
151        match &mut self.0.borrow_mut().key_predicate {
152            Predicate::Bounds(bounds) => bounds.push(bound),
153            Predicate::Filter(_) => {}
154        };
155    }
156
157    /// Set key retainment condition.  Disables any key bounds
158    /// set using [`Self::add_key_bound`].
159    pub(crate) fn set_key_filter(&self, filter: Filter<K>) {
160        self.0.borrow_mut().key_predicate = Predicate::Filter(filter);
161    }
162
163    pub(crate) fn set_unique_key_bound_name(&self, unique_name: Option<&str>) {
164        self.0.borrow_mut().unique_key_name = unique_name.map(str::to_string);
165    }
166
167    pub(crate) fn add_val_bound(&self, bound: TraceBound<V>) {
168        match &mut self.0.borrow_mut().val_predicate {
169            GroupPredicate::Bounds(bounds) => bounds.push(bound),
170            GroupPredicate::Filter(_) => {}
171        };
172    }
173
174    /// Set value retainment condition.  Disables any value bounds
175    /// set using [`Self::add_val_bound`].
176    pub(crate) fn set_val_filter(&self, filter: GroupFilter<V>) {
177        self.0.borrow_mut().val_predicate = GroupPredicate::Filter(filter);
178    }
179
180    pub(crate) fn set_unique_val_bound_name(&self, unique_name: Option<&str>) {
181        self.0.borrow_mut().unique_val_name = unique_name.map(str::to_string);
182    }
183
184    /// Returns effective key retention condition, computed as the
185    /// minimum bound installed using [`Self::add_val_bound`] or as the
186    /// condition installed using [`Self::set_val_filter`] (the latter
187    /// takes precedence).
188    pub(crate) fn effective_key_filter(&self) -> Option<Filter<K>> {
189        match &(*self.0).borrow().key_predicate {
190            Predicate::Bounds(bounds) => bounds
191                .iter()
192                .min()
193                .expect("At least one trace bound must be set")
194                .get()
195                .deref()
196                .as_ref()
197                .map(|bx| Arc::from(clone_box(bx.as_ref())))
198                .map(|bound: Arc<K>| {
199                    let metadata = MetaItem::String(format!("{bound:?}"));
200                    Filter::new(Box::new(move |k: &K| {
201                        bound.as_ref().cmp(k) != Ordering::Greater
202                    }))
203                    .with_metadata(metadata)
204                }),
205            Predicate::Filter(filter) => Some(filter.clone()),
206        }
207    }
208
209    /// Returns effective value retention condition, computed as the
210    /// minimum bound installed using [`Self::add_val_bound`] or as the
211    /// condition installed using [`Self::set_val_filter`] (the latter
212    /// takes precedence).
213    pub(crate) fn effective_val_filter(&self) -> Option<GroupFilter<V>> {
214        match &(*self.0).borrow().val_predicate {
215            GroupPredicate::Bounds(bounds) => bounds
216                .iter()
217                .min()
218                .expect("At least one trace bound must be set")
219                .get()
220                .deref()
221                .as_ref()
222                .map(|bx| Arc::from(clone_box(bx.as_ref())))
223                .map(|bound: Arc<V>| {
224                    let metadata = MetaItem::String(format!("{bound:?}"));
225                    GroupFilter::Simple(
226                        Filter::new(Box::new(move |v: &V| {
227                            bound.as_ref().cmp(v) != Ordering::Greater
228                        }))
229                        .with_metadata(metadata),
230                    )
231                }),
232            GroupPredicate::Filter(filter) => Some(filter.clone()),
233        }
234    }
235
236    pub(crate) fn metadata(&self) -> MetaItem {
237        self.0.borrow().metadata()
238    }
239}
240
241/// Value retainment predicate defined as either a set of bounds
242/// or a filter condition.
243///
244/// See [`Stream::dyn_integrate_trace_retain_keys`] for details.
245enum Predicate<V: ?Sized> {
246    Bounds(Vec<TraceBound<V>>),
247    Filter(Filter<V>),
248}
249
250impl<V: Debug + ?Sized> Predicate<V> {
251    pub fn metadata(&self) -> MetaItem {
252        match self {
253            Self::Bounds(bounds) => MetaItem::Array(
254                bounds
255                    .iter()
256                    .map(|b| MetaItem::String(format!("{b:?}")))
257                    .collect(),
258            ),
259            Self::Filter(filter) => filter.metadata().clone(),
260        }
261    }
262}
263
264enum GroupPredicate<V: DataTrait + ?Sized> {
265    /// Discard values that are less than at least one of the bounds in the vector.
266    Bounds(Vec<TraceBound<V>>),
267    /// Discard values that don't satisfy the filter.
268    Filter(GroupFilter<V>),
269}
270
271impl<V: DataTrait + ?Sized> GroupPredicate<V> {
272    pub fn metadata(&self) -> MetaItem {
273        match self {
274            Self::Bounds(bounds) => MetaItem::Array(
275                bounds
276                    .iter()
277                    .map(|b| MetaItem::String(format!("{b:?}")))
278                    .collect(),
279            ),
280            Self::Filter(filter) => filter.metadata().clone(),
281        }
282    }
283}
284
285struct TraceBoundsInner<K: ?Sized + 'static, V: DataTrait + ?Sized> {
286    /// Key bounds _or_ retainment condition.
287    key_predicate: Predicate<K>,
288    unique_key_name: Option<String>,
289    /// Value bounds _or_ retainment condition.
290    val_predicate: GroupPredicate<V>,
291    unique_val_name: Option<String>,
292}
293
294impl<K: Debug + ?Sized + 'static, V: DataTrait + ?Sized> TraceBoundsInner<K, V> {
295    pub fn metadata(&self) -> MetaItem {
296        MetaItem::Map(BTreeMap::from([
297            (Cow::Borrowed("key"), self.key_predicate.metadata()),
298            (Cow::Borrowed("value"), self.val_predicate.metadata()),
299        ]))
300    }
301}
302
303pub type TimedSpine<B, C> = Spine<<<C as WithClock>::Time as Timestamp>::TimedBatch<B>>;
304
305impl<C, B> Stream<C, B>
306where
307    C: Circuit,
308    B: Clone + Send + Sync + 'static,
309{
310    /// See [`Stream::trace`].
311    pub fn dyn_trace(
312        &self,
313        output_factories: &<TimedSpine<B, C> as BatchReader>::Factories,
314        batch_factories: &B::Factories,
315    ) -> Stream<C, TimedSpine<B, C>>
316    where
317        B: Batch<Time = ()>,
318    {
319        self.dyn_trace_with_bound(
320            output_factories,
321            batch_factories,
322            TraceBound::new(),
323            TraceBound::new(),
324        )
325    }
326
327    /// See [`Stream::trace_with_bound`].
328    pub fn dyn_trace_with_bound(
329        &self,
330        output_factories: &<TimedSpine<B, C> as BatchReader>::Factories,
331        batch_factories: &B::Factories,
332        lower_key_bound: TraceBound<B::Key>,
333        lower_val_bound: TraceBound<B::Val>,
334    ) -> Stream<C, TimedSpine<B, C>>
335    where
336        B: Batch<Time = ()>,
337    {
338        let bounds = self.trace_bounds_with_bound(lower_key_bound, lower_val_bound);
339
340        self.circuit()
341            .cache_get_or_insert_with(TraceId::new(self.stream_id()), || {
342                let circuit = self.circuit();
343
344                circuit.region("trace", || {
345                    let persistent_id = self.get_persistent_id();
346                    let z1 = Z1Trace::new(
347                        output_factories,
348                        batch_factories,
349                        false,
350                        circuit.root_scope(),
351                        bounds.clone(),
352                    );
353                    let (delayed_trace, z1feedback) = circuit.add_feedback_persistent(
354                        persistent_id
355                            .map(|name| format!("{name}.integral"))
356                            .as_deref(),
357                        z1,
358                    );
359
360                    let replay_stream = z1feedback.operator_mut().prepare_replay_stream(self);
361
362                    let trace = circuit.add_binary_operator_with_preference(
363                        <TraceAppend<TimedSpine<B, C>, B, C>>::new(
364                            output_factories,
365                            circuit.clone(),
366                        ),
367                        (&delayed_trace, OwnershipPreference::STRONGLY_PREFER_OWNED),
368                        (self, OwnershipPreference::PREFER_OWNED),
369                    );
370                    if self.is_sharded() {
371                        delayed_trace.mark_sharded();
372                        trace.mark_sharded();
373                    }
374
375                    z1feedback.connect_with_preference(
376                        &trace,
377                        OwnershipPreference::STRONGLY_PREFER_OWNED,
378                    );
379
380                    register_replay_stream(circuit, self, &replay_stream);
381
382                    circuit.cache_insert(DelayedTraceId::new(trace.stream_id()), delayed_trace);
383                    trace
384                })
385            })
386            .clone()
387    }
388
389    /// See [`Stream::integrate_trace_retain_keys`].
390    #[track_caller]
391    pub fn dyn_integrate_trace_retain_keys<TS>(
392        &self,
393        bounds_stream: &Stream<C, Box<TS>>,
394        retain_key_func: Box<dyn Fn(&TS) -> Filter<B::Key>>,
395    ) where
396        B: Batch<Time = ()>,
397        TS: DataTrait + ?Sized,
398        Box<TS>: Clone,
399    {
400        let bounds = self.trace_bounds();
401        bounds.set_unique_key_bound_name(bounds_stream.get_persistent_id().as_deref());
402        bounds_stream.inspect(move |ts| {
403            let filter = retain_key_func(ts.as_ref());
404            bounds.set_key_filter(filter);
405        });
406    }
407
408    /// See [`Stream::integrate_trace_retain_values`].
409    #[track_caller]
410    pub fn dyn_integrate_trace_retain_values<TS>(
411        &self,
412        bounds_stream: &Stream<C, Box<TS>>,
413        retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>,
414    ) where
415        B: Batch<Time = ()>,
416        TS: DataTrait + ?Sized,
417        Box<TS>: Clone,
418    {
419        let bounds = self.trace_bounds();
420        bounds.set_unique_val_bound_name(bounds_stream.get_persistent_id().as_deref());
421
422        bounds_stream.inspect(move |ts| {
423            let filter = GroupFilter::Simple(retain_val_func(ts.as_ref()));
424            bounds.set_val_filter(filter);
425        });
426    }
427
428    #[track_caller]
429    pub fn dyn_integrate_trace_retain_values_last_n<TS>(
430        &self,
431        bounds_stream: &Stream<C, Box<TS>>,
432        retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>,
433        n: usize,
434    ) where
435        B: Batch<Time = ()>,
436        TS: DataTrait + ?Sized,
437        Box<TS>: Clone,
438    {
439        let bounds = self.trace_bounds();
440        bounds.set_unique_val_bound_name(bounds_stream.get_persistent_id().as_deref());
441
442        bounds_stream.inspect(move |ts| {
443            let filter = GroupFilter::LastN(n, retain_val_func(ts.as_ref()));
444            bounds.set_val_filter(filter);
445        });
446    }
447
448    #[track_caller]
449    pub fn dyn_integrate_trace_retain_values_top_n<TS>(
450        &self,
451        val_factory: &'static dyn Factory<B::Val>,
452        bounds_stream: &Stream<C, Box<TS>>,
453        retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>,
454        n: usize,
455    ) where
456        B: Batch<Time = ()>,
457        TS: DataTrait + ?Sized,
458        Box<TS>: Clone,
459    {
460        let bounds = self.trace_bounds();
461        bounds.set_unique_val_bound_name(bounds_stream.get_persistent_id().as_deref());
462
463        bounds_stream.inspect(move |ts| {
464            let filter = GroupFilter::TopN(n, retain_val_func(ts.as_ref()), val_factory);
465            bounds.set_val_filter(filter);
466        });
467    }
468
469    /// Retrieves trace bounds for `self`, creating them if necessary.
470    ///
471    /// It's important that a single `TraceBounds` includes all of the bounds
472    /// relevant to a particular trace.  This can be tricky in the presence of
473    /// multiple versions of a stream that code tends to treat as the same.  We
474    /// manage it by mapping all of those versions to just one single version:
475    ///
476    /// * For a sharded version of some source stream, we use the source stream.
477    ///
478    /// * For a spilled version of some source stream, we use the source stream.
479    ///
480    /// Using the source stream is a safer choice than using the sharded (or
481    /// spilled) version, because it always exists, whereas the sharded version
482    /// might be created only *after* we get the trace bounds for the source
483    /// stream.
484    fn trace_bounds(&self) -> TraceBounds<B::Key, B::Val>
485    where
486        B: BatchReader,
487    {
488        // We handle moving from the sharded to unsharded stream directly here.
489        let stream_id = self.try_unsharded_version().stream_id();
490
491        self.circuit()
492            .cache_get_or_insert_with(BoundsId::<B>::new(stream_id), TraceBounds::new)
493            .clone()
494    }
495
496    /// Retrieves trace bounds for `self`, or a sharded version of `self` if it
497    /// exists, creating them if necessary, and adds bounds for
498    /// `lower_key_bound` and `lower_val_bound`.
499    fn trace_bounds_with_bound(
500        &self,
501        lower_key_bound: TraceBound<B::Key>,
502        lower_val_bound: TraceBound<B::Val>,
503    ) -> TraceBounds<B::Key, B::Val>
504    where
505        B: BatchReader,
506    {
507        let bounds = self.trace_bounds();
508        bounds.add_key_bound(lower_key_bound);
509        bounds.add_val_bound(lower_val_bound);
510        bounds
511    }
512
513    // TODO: this method should replace `Stream::integrate()`.
514    #[track_caller]
515    pub fn dyn_integrate_trace(&self, factories: &B::Factories) -> Stream<C, Spine<B>>
516    where
517        B: Batch<Time = ()>,
518        Spine<B>: SizeOf,
519    {
520        self.dyn_integrate_trace_with_bound(factories, TraceBound::new(), TraceBound::new())
521    }
522
523    pub fn dyn_integrate_trace_with_bound(
524        &self,
525        factories: &B::Factories,
526        lower_key_bound: TraceBound<B::Key>,
527        lower_val_bound: TraceBound<B::Val>,
528    ) -> Stream<C, Spine<B>>
529    where
530        B: Batch<Time = ()>,
531        Spine<B>: SizeOf,
532    {
533        self.integrate_trace_inner(
534            factories,
535            self.trace_bounds_with_bound(lower_key_bound, lower_val_bound),
536        )
537    }
538
539    #[allow(clippy::type_complexity)]
540    fn integrate_trace_inner(
541        &self,
542        input_factories: &B::Factories,
543        bounds: TraceBounds<B::Key, B::Val>,
544    ) -> Stream<C, Spine<B>>
545    where
546        B: Batch<Time = ()>,
547        Spine<B>: SizeOf,
548    {
549        self.circuit()
550            .cache_get_or_insert_with(TraceId::new(self.stream_id()), || {
551                let circuit = self.circuit();
552                let bounds = bounds.clone();
553
554                let persistent_id = self.get_persistent_id();
555
556                circuit.region("integrate_trace", || {
557                    let z1 = Z1Trace::new(
558                        input_factories,
559                        input_factories,
560                        true,
561                        circuit.root_scope(),
562                        bounds,
563                    );
564
565                    let (
566                        ExportStream {
567                            local: delayed_trace,
568                            export,
569                        },
570                        z1feedback,
571                    ) = circuit.add_feedback_with_export_persistent(
572                        persistent_id
573                            .map(|name| format!("{name}.integral"))
574                            .as_deref(),
575                        z1,
576                    );
577
578                    let replay_stream = z1feedback.operator_mut().prepare_replay_stream(self);
579
580                    let trace = circuit.add_binary_operator_with_preference(
581                        UntimedTraceAppend::<Spine<B>>::new(),
582                        (&delayed_trace, OwnershipPreference::STRONGLY_PREFER_OWNED),
583                        (self, OwnershipPreference::PREFER_OWNED),
584                    );
585
586                    if self.is_sharded() {
587                        delayed_trace.mark_sharded();
588                        trace.mark_sharded();
589                    }
590
591                    z1feedback.connect_with_preference(
592                        &trace,
593                        OwnershipPreference::STRONGLY_PREFER_OWNED,
594                    );
595
596                    register_replay_stream(circuit, self, &replay_stream);
597
598                    circuit.cache_insert(DelayedTraceId::new(trace.stream_id()), delayed_trace);
599                    circuit.cache_insert(ExportId::new(trace.stream_id()), export);
600
601                    trace
602                })
603            })
604            .clone()
605    }
606}
607
608/// See [`trait TraceFeedback`] documentation.
609pub struct TraceFeedbackConnector<C, T>
610where
611    C: Circuit,
612    T: Trace,
613{
614    feedback: FeedbackConnector<C, T, T, Z1Trace<C, T::Batch, T>>,
615    /// `delayed_trace` stream in the diagram in
616    /// [`trait TraceFeedback`] documentation.
617    pub delayed_trace: Stream<C, T>,
618    export_trace: Stream<C::Parent, T>,
619    bounds: TraceBounds<T::Key, T::Val>,
620}
621
622impl<C, T> TraceFeedbackConnector<C, T>
623where
624    T: Trace<Time = ()> + Clone,
625    C: Circuit,
626{
627    pub fn connect(self, stream: &Stream<C, T::Batch>) {
628        let circuit = self.delayed_trace.circuit();
629
630        let replay_stream = self.feedback.operator_mut().prepare_replay_stream(stream);
631
632        let trace = circuit.add_binary_operator_with_preference(
633            <UntimedTraceAppend<T>>::new(),
634            (
635                &self.delayed_trace,
636                OwnershipPreference::STRONGLY_PREFER_OWNED,
637            ),
638            (stream, OwnershipPreference::PREFER_OWNED),
639        );
640
641        if stream.is_sharded() {
642            self.delayed_trace.mark_sharded();
643            trace.mark_sharded();
644        }
645
646        self.feedback
647            .connect_with_preference(&trace, OwnershipPreference::STRONGLY_PREFER_OWNED);
648
649        register_replay_stream(circuit, stream, &replay_stream);
650
651        circuit.cache_insert(
652            DelayedTraceId::new(trace.stream_id()),
653            self.delayed_trace.clone(),
654        );
655
656        circuit.cache_insert(TraceId::new(stream.stream_id()), trace.clone());
657        circuit.cache_insert(
658            BoundsId::<T::Batch>::new(stream.stream_id()),
659            self.bounds.clone(),
660        );
661        circuit.cache_insert(ExportId::new(trace.stream_id()), self.export_trace);
662    }
663}
664
665/// Extension trait to trait [`Circuit`] that provides a convenience API
666/// to construct circuits of the following shape:
667///
668/// ```text
669///  external inputs    ┌───┐    output    ┌──────────────────┐
670/// ───────────────────►│ F ├─────────────►│UntimedTraceAppend├───┐
671///                     └───┘              └──────────────────┘   │
672///                       ▲                      ▲                │trace
673///                       │                      │                │
674///                       │    delayed_trace   ┌─┴──┐             │
675///                       └────────────────────┤Z^-1│◄────────────┘
676///                                            └────┘
677/// ```
678/// where `F` is an operator that consumes an integral of its own output
679/// stream.
680///
681/// Use this method to create a
682/// [`TraceFeedbackConnector`] struct.  The struct contains the `delayed_trace`
683/// stream, which can be used as input to instantiate `F` and the `output`
684/// stream.  Close the loop by calling
685/// `TraceFeedbackConnector::connect(output)`.
686pub trait TraceFeedback: Circuit {
687    fn add_integrate_trace_feedback<T>(
688        &self,
689        persistent_id: Option<&str>,
690        factories: &T::Factories,
691        bounds: TraceBounds<T::Key, T::Val>,
692    ) -> TraceFeedbackConnector<Self, T>
693    where
694        T: Trace<Time = ()> + Clone,
695    {
696        // We'll give `Z1Trace` a real name inside `TraceFeedbackConnector::connect`, where we have the name of the input stream.
697        let (ExportStream { local, export }, feedback) = self.add_feedback_with_export_persistent(
698            persistent_id
699                .map(|name| format!("{name}.integral"))
700                .as_deref(),
701            Z1Trace::new(
702                factories,
703                factories,
704                true,
705                self.root_scope(),
706                bounds.clone(),
707            ),
708        );
709
710        TraceFeedbackConnector {
711            feedback,
712            delayed_trace: local,
713            export_trace: export,
714            bounds,
715        }
716    }
717}
718
719impl<C: Circuit> TraceFeedback for C {}
720
721impl<C, B> Stream<C, Spine<B>>
722where
723    C: Circuit,
724    B: Batch,
725{
726    pub fn delay_trace(&self) -> Stream<C, SpineSnapshot<B>> {
727        // The delayed trace should be automatically created while the real trace is
728        // created via `.trace()` or a similar function
729        // FIXME: Create a trace if it doesn't exist
730        let delayed_trace = self
731            .circuit()
732            .cache_get_or_insert_with(DelayedTraceId::new(self.stream_id()), || {
733                panic!("called `.delay_trace()` on a stream without a previously created trace")
734            })
735            .deref()
736            .clone();
737        delayed_trace.apply(|spine: &Spine<B>| spine.ro_snapshot())
738    }
739}
740
741pub struct UntimedTraceAppend<T>
742where
743    T: Trace,
744{
745    // Total number of input tuples processed by the operator.
746    num_inputs: usize,
747
748    _phantom: PhantomData<T>,
749}
750
751impl<T> Default for UntimedTraceAppend<T>
752where
753    T: Trace,
754{
755    fn default() -> Self {
756        Self::new()
757    }
758}
759
760impl<T> UntimedTraceAppend<T>
761where
762    T: Trace,
763{
764    pub fn new() -> Self {
765        Self {
766            num_inputs: 0,
767            _phantom: PhantomData,
768        }
769    }
770}
771
772impl<T> Operator for UntimedTraceAppend<T>
773where
774    T: Trace + 'static,
775{
776    fn name(&self) -> Cow<'static, str> {
777        Cow::from("UntimedTraceAppend")
778    }
779
780    fn metadata(&self, meta: &mut OperatorMeta) {
781        meta.extend(metadata! {
782            INPUT_RECORDS_COUNT => MetaItem::Count(self.num_inputs),
783        });
784    }
785
786    fn fixedpoint(&self, _scope: Scope) -> bool {
787        true
788    }
789}
790
791impl<T> BinaryOperator<T, T::Batch, T> for UntimedTraceAppend<T>
792where
793    T: Trace + 'static,
794{
795    async fn eval(&mut self, _trace: &T, _batch: &T::Batch) -> T {
796        // Refuse to accept trace by reference.  This should not happen in a correctly
797        // constructed circuit.
798        panic!("UntimedTraceAppend::eval(): cannot accept trace by reference")
799    }
800
801    async fn eval_owned_and_ref(&mut self, mut trace: T, batch: &T::Batch) -> T {
802        self.num_inputs += batch.len();
803        trace.insert(batch.clone()).await;
804        trace
805    }
806
807    async fn eval_ref_and_owned(&mut self, _trace: &T, _batch: T::Batch) -> T {
808        // Refuse to accept trace by reference.  This should not happen in a correctly
809        // constructed circuit.
810        panic!("UntimedTraceAppend::eval_ref_and_owned(): cannot accept trace by reference")
811    }
812
813    async fn eval_owned(&mut self, mut trace: T, batch: T::Batch) -> T {
814        self.num_inputs += batch.len();
815
816        trace.insert(batch).await;
817        trace
818    }
819
820    fn input_preference(&self) -> (OwnershipPreference, OwnershipPreference) {
821        (
822            OwnershipPreference::PREFER_OWNED,
823            OwnershipPreference::PREFER_OWNED,
824        )
825    }
826}
827
828pub struct TraceAppend<T: Trace, B: BatchReader, C> {
829    clock: C,
830    output_factories: T::Factories,
831
832    // Total number of input tuples processed by the operator.
833    num_inputs: usize,
834
835    _phantom: PhantomData<(T, B)>,
836}
837
838impl<T: Trace, B: BatchReader, C> TraceAppend<T, B, C> {
839    pub fn new(output_factories: &T::Factories, clock: C) -> Self {
840        Self {
841            clock,
842            output_factories: output_factories.clone(),
843            num_inputs: 0,
844            _phantom: PhantomData,
845        }
846    }
847}
848
849impl<T, B, Clk> Operator for TraceAppend<T, B, Clk>
850where
851    T: Trace,
852    B: BatchReader,
853    Clk: 'static,
854{
855    fn name(&self) -> Cow<'static, str> {
856        Cow::from("TraceAppend")
857    }
858    fn fixedpoint(&self, _scope: Scope) -> bool {
859        true
860    }
861
862    fn metadata(&self, meta: &mut OperatorMeta) {
863        meta.extend(metadata! {
864            INPUT_RECORDS_COUNT => MetaItem::Count(self.num_inputs),
865        });
866    }
867}
868
869impl<T, B, Clk> BinaryOperator<T, B, T> for TraceAppend<T, B, Clk>
870where
871    B: BatchReader<Time = ()>,
872    Clk: WithClock + 'static,
873    T: Trace<Key = B::Key, Val = B::Val, R = B::R, Time = Clk::Time>,
874{
875    async fn eval(&mut self, _trace: &T, _batch: &B) -> T {
876        // Refuse to accept trace by reference.  This should not happen in a correctly
877        // constructed circuit.
878        unimplemented!()
879    }
880
881    async fn eval_owned_and_ref(&mut self, mut trace: T, batch: &B) -> T {
882        // TODO: extend `trace` type to feed untimed batches directly
883        // (adding fixed timestamp on the fly).
884        self.num_inputs += batch.len();
885        trace
886            .insert(T::Batch::from_batch(
887                batch,
888                &self.clock.time(),
889                &self.output_factories,
890            ))
891            .await;
892        trace
893    }
894
895    async fn eval_ref_and_owned(&mut self, _trace: &T, _batch: B) -> T {
896        // Refuse to accept trace by reference.  This should not happen in a correctly
897        // constructed circuit.
898        unimplemented!()
899    }
900
901    async fn eval_owned(&mut self, mut trace: T, batch: B) -> T {
902        self.num_inputs += batch.len();
903
904        if TypeId::of::<B>() == TypeId::of::<T::Batch>() {
905            let mut batch = Some(batch);
906            let batch = unsafe { transmute::<&mut Option<B>, &mut Option<T::Batch>>(&mut batch) };
907            trace.insert(batch.take().unwrap()).await;
908        } else {
909            trace
910                .insert(T::Batch::from_batch(
911                    &batch,
912                    &self.clock.time(),
913                    &self.output_factories,
914                ))
915                .await;
916        }
917        trace
918    }
919
920    fn input_preference(&self) -> (OwnershipPreference, OwnershipPreference) {
921        (
922            OwnershipPreference::PREFER_OWNED,
923            OwnershipPreference::PREFER_OWNED,
924        )
925    }
926}
927
928#[self_referencing]
929struct ReplayState<T: Trace> {
930    trace: T,
931    #[borrows(trace)]
932    #[covariant]
933    cursor: Box<dyn MergeCursor<T::Key, T::Val, T::Time, T::R> + Send + 'this>,
934}
935
936impl<T: Trace> ReplayState<T> {
937    fn create(trace: T) -> Self {
938        ReplayStateBuilder {
939            trace,
940            cursor_builder: |trace| trace.merge_cursor(None, None),
941        }
942        .build()
943    }
944}
945
946pub struct Z1Trace<C: Circuit, B: Batch, T: Trace> {
947    // For error reporting.
948    global_id: GlobalNodeId,
949    time: T::Time,
950    trace: Option<T>,
951    replay_state: Option<ReplayState<T>>,
952    replay_mode: bool,
953    trace_factories: T::Factories,
954    // `dirty[scope]` is `true` iff at least one non-empty update was added to the trace
955    // since the previous clock cycle at level `scope`.
956    dirty: Vec<bool>,
957    root_scope: Scope,
958    reset_on_clock_start: bool,
959    bounds: TraceBounds<T::Key, T::Val>,
960
961    // Metrics maintained by the trace.
962    batch_factories: B::Factories,
963    // Stream whose integral this Z1 operator stores, if any.
964    delta_stream: Option<Stream<C, B>>,
965    flush_output: bool,
966    flush_input: bool,
967}
968
969impl<C, B, T> Z1Trace<C, B, T>
970where
971    C: Circuit,
972    B: Batch,
973    T: Trace,
974{
975    pub fn new(
976        trace_factories: &T::Factories,
977        batch_factories: &B::Factories,
978        reset_on_clock_start: bool,
979        root_scope: Scope,
980        bounds: TraceBounds<T::Key, T::Val>,
981    ) -> Self {
982        Self {
983            global_id: GlobalNodeId::root(),
984            time: <T::Time as Timestamp>::clock_start(),
985            trace: None,
986            replay_state: None,
987            replay_mode: false,
988            trace_factories: trace_factories.clone(),
989            batch_factories: batch_factories.clone(),
990            dirty: vec![false; root_scope as usize + 1],
991            root_scope,
992            reset_on_clock_start,
993            bounds,
994            delta_stream: None,
995            flush_output: false,
996            flush_input: false,
997        }
998    }
999
1000    /// Creates a stream that will be used to replay the contents of `stream`.
1001    ///
1002    /// Given a circuit that implements an integral, the Z-1 operator can be used
1003    /// to replay the `delta` stream during bootstrapping.  This function sets this
1004    /// up at circuit construction time. It creates a new stream (`replay_stream`)
1005    /// that aliases `stream` internally.  In replay mode, Z-1 will send the contents
1006    /// of the integral to `replay_stream` chunk-by-chunk.
1007    ///
1008    ///   │stream
1009    ///   │
1010    ///   │
1011    ///   │◄............
1012    ///   │            .replay_stream
1013    ///   ▼            .
1014    /// ┌───┐        ┌───┐
1015    /// │ + ├───────►│Z-1│
1016    /// └───┘        └─┬─┘
1017    ///   ▲            │
1018    ///   │            │
1019    ///   └────────────┘
1020    ///
1021    /// Note that at most one of `stream` or `replay_stream` can be active at a time.
1022    /// During normal operation, `stream` is active and `replay_stream` is not.  During
1023    /// replay, `replay_stream` is active while the operator that normally write to
1024    /// stream is disabled.
1025    pub fn prepare_replay_stream(&mut self, stream: &Stream<C, B>) -> Stream<C, B> {
1026        let replay_stream = Stream::with_value(
1027            stream.circuit().clone(),
1028            self.global_id.local_node_id().unwrap(),
1029            stream.value(),
1030        );
1031
1032        self.delta_stream = Some(replay_stream.clone());
1033        replay_stream
1034    }
1035}
1036
1037impl<C, B, T> Operator for Z1Trace<C, B, T>
1038where
1039    C: Circuit,
1040    B: Batch,
1041    T: Trace,
1042{
1043    fn name(&self) -> Cow<'static, str> {
1044        Cow::from("Z1 (trace)")
1045    }
1046
1047    fn clock_start(&mut self, scope: Scope) {
1048        self.dirty[scope as usize] = false;
1049
1050        if scope == 0 && self.trace.is_none() {
1051            self.trace = Some(T::new(&self.trace_factories));
1052        }
1053    }
1054
1055    fn clock_end(&mut self, scope: Scope) {
1056        if scope + 1 == self.root_scope
1057            && !self.reset_on_clock_start
1058            && let Some(tr) = self.trace.as_mut()
1059        {
1060            tr.set_frontier(&self.time.epoch_start(scope));
1061        }
1062        self.time = self.time.advance(scope + 1);
1063    }
1064
1065    fn init(&mut self, global_id: &GlobalNodeId) {
1066        self.global_id = global_id.clone();
1067    }
1068
1069    fn metadata(&self, meta: &mut OperatorMeta) {
1070        let total_size = self
1071            .trace
1072            .as_ref()
1073            .map(|trace| trace.num_entries_deep())
1074            .unwrap_or(0);
1075
1076        let bytes = self
1077            .trace
1078            .as_ref()
1079            .map(|trace| trace.size_of())
1080            .unwrap_or_default();
1081
1082        meta.extend(metadata! {
1083            STATE_RECORDS_COUNT => MetaItem::Count(total_size),
1084            ALLOCATED_MEMORY_BYTES => MetaItem::bytes(bytes.total_bytes()),
1085            USED_MEMORY_BYTES => MetaItem::bytes(bytes.used_bytes()),
1086            MEMORY_ALLOCATIONS_COUNT => MetaItem::Count(bytes.distinct_allocations()),
1087            SHARED_MEMORY_BYTES => MetaItem::bytes(bytes.shared_bytes()),
1088            RETAINMENT_BOUNDS => self.bounds.metadata()
1089        });
1090
1091        if let Some(trace) = self.trace.as_ref() {
1092            trace.metadata(meta);
1093        }
1094    }
1095
1096    fn fixedpoint(&self, scope: Scope) -> bool {
1097        !self.dirty[scope as usize] && self.replay_state.is_none()
1098    }
1099
1100    fn checkpoint(
1101        &mut self,
1102        base: &StoragePath,
1103        pid: Option<&str>,
1104        files: &mut Vec<Arc<dyn FileCommitter>>,
1105    ) -> Result<(), Error> {
1106        let pid = require_persistent_id(pid, &self.global_id)?;
1107        self.trace
1108            .as_mut()
1109            .map(|trace| trace.save(base, pid, files))
1110            .unwrap_or(Ok(()))
1111    }
1112
1113    fn restore(&mut self, base: &StoragePath, pid: Option<&str>) -> Result<(), Error> {
1114        let pid = require_persistent_id(pid, &self.global_id)?;
1115
1116        self.trace
1117            .as_mut()
1118            .map(|trace| trace.restore(base, pid))
1119            .unwrap_or(Ok(()))
1120    }
1121
1122    fn clear_state(&mut self) -> Result<(), Error> {
1123        // println!("Z1Trace-{}::clear_state", &self.global_id);
1124        self.trace = Some(T::new(&self.trace_factories));
1125        self.replay_state = None;
1126        self.dirty = vec![false; self.root_scope as usize + 1];
1127
1128        Ok(())
1129    }
1130
1131    fn start_replay(&mut self) -> Result<(), Error> {
1132        // The second condition is necessary if `start_replay` is called twice, for the input
1133        // and output halves of Z1.
1134        // println!(
1135        //     "Z1Trace-{}::start_replay delta_stream: {:?}",
1136        //     &self.global_id,
1137        //     self.delta_stream.is_some()
1138        // );
1139        self.replay_mode = true;
1140        if self.delta_stream.is_some() && self.replay_state.is_none() {
1141            let trace = self.trace.take().expect("Z1Trace::start_replay: no trace");
1142            self.trace = Some(T::new(&self.trace_factories));
1143
1144            //println!("Z1Trace-{}::initializing replay_state", &self.global_id);
1145
1146            self.replay_state = Some(ReplayState::create(trace));
1147        }
1148
1149        Ok(())
1150    }
1151
1152    fn is_replay_complete(&self) -> bool {
1153        self.replay_state.is_none()
1154    }
1155
1156    fn end_replay(&mut self) -> Result<(), Error> {
1157        //println!("Z1Trace-{}::end_replay", &self.global_id);
1158        self.replay_mode = false;
1159        self.replay_state = None;
1160
1161        Ok(())
1162    }
1163
1164    fn flush(&mut self) {
1165        self.flush_output = true;
1166    }
1167
1168    fn is_flush_complete(&self) -> bool {
1169        !self.flush_output
1170    }
1171
1172    fn start_compaction(&mut self) {
1173        if let Some(trace) = self.trace.as_mut() {
1174            trace.initiate_compaction()
1175        }
1176    }
1177}
1178
1179impl<C, B, T> StrictOperator<T> for Z1Trace<C, B, T>
1180where
1181    C: Circuit,
1182    B: Batch<Key = T::Key, Val = T::Val, Time = (), R = T::R>,
1183    T: Trace,
1184{
1185    fn get_output(&mut self) -> T {
1186        //println!("Z1-{}::get_output", &self.global_id);
1187        let replay_step_size = splitter_output_chunk_size();
1188
1189        if self.replay_mode {
1190            // One output per transaction.
1191            if self.flush_output
1192                && let Some(replay) = &mut self.replay_state
1193            {
1194                //println!("Z1-{}::get_output: replaying", &self.global_id);
1195                let mut builder = <B::Builder as Builder<B>>::with_capacity(
1196                    &self.batch_factories,
1197                    replay_step_size,
1198                    replay_step_size,
1199                );
1200
1201                let mut num_values = 0;
1202                let mut weight = self.batch_factories.weight_factory().default_box();
1203
1204                while replay.borrow_cursor().key_valid() && num_values < replay_step_size {
1205                    let mut values_added = false;
1206                    while replay.borrow_cursor().val_valid() && num_values < replay_step_size {
1207                        weight.set_zero();
1208                        replay.with_cursor_mut(|cursor| {
1209                            cursor.map_times(&mut |_t, w| weight.add_assign(w))
1210                        });
1211
1212                        if !weight.is_zero() {
1213                            builder.push_val_diff(replay.borrow_cursor().val(), weight.as_ref());
1214                            values_added = true;
1215                            num_values += 1;
1216                        }
1217                        replay.with_cursor_mut(|cursor| cursor.step_val());
1218                    }
1219                    if values_added {
1220                        builder.push_key(replay.borrow_cursor().key());
1221                    }
1222                    if !replay.borrow_cursor().val_valid() {
1223                        replay.with_cursor_mut(|cursor| cursor.step_key());
1224                    }
1225                }
1226
1227                let batch = builder.done();
1228                self.delta_stream.as_ref().unwrap().value().put(batch);
1229                if !replay.borrow_cursor().key_valid() {
1230                    self.replay_state = None;
1231                    self.flush_output = false;
1232                }
1233            } else {
1234                // Continue producing empty outputs as long as the circuit is in the replay mode.
1235                self.delta_stream
1236                    .as_ref()
1237                    .unwrap()
1238                    .value()
1239                    .put(B::dyn_empty(&self.batch_factories));
1240                self.flush_output = false;
1241            }
1242        } else {
1243            self.flush_output = false;
1244        }
1245
1246        let mut result = self.trace.take().unwrap();
1247        result.clear_dirty_flag();
1248        result
1249    }
1250
1251    fn get_final_output(&mut self) -> T {
1252        // We only create the operator using `add_feedback_with_export` if
1253        // `reset_on_clock_start` is true, so this should never get invoked
1254        // otherwise.
1255        assert!(self.reset_on_clock_start);
1256        self.get_output()
1257    }
1258}
1259
1260impl<C, B, T> StrictUnaryOperator<T, T> for Z1Trace<C, B, T>
1261where
1262    C: Circuit,
1263    B: Batch<Key = T::Key, Val = T::Val, Time = (), R = T::R>,
1264    T: Trace,
1265{
1266    fn flush_input(&mut self) {
1267        self.flush_input = true;
1268    }
1269
1270    fn is_flush_input_complete(&self) -> bool {
1271        !self.flush_input
1272    }
1273
1274    async fn eval_strict(&mut self, _i: &T) {
1275        unimplemented!()
1276    }
1277
1278    async fn eval_strict_owned(&mut self, mut i: T) {
1279        // println!("Z1-{}::eval_strict_owned", &self.global_id);
1280
1281        if self.flush_input {
1282            self.time = self.time.advance(0);
1283            self.flush_input = false;
1284        }
1285
1286        let dirty = i.dirty();
1287
1288        if let Some(filter) = self.bounds.effective_key_filter() {
1289            i.retain_keys(filter);
1290        }
1291
1292        if let Some(filter) = self.bounds.effective_val_filter() {
1293            i.retain_values(filter);
1294        }
1295
1296        self.trace = Some(i);
1297
1298        self.dirty[0] = dirty;
1299        if dirty {
1300            self.dirty.fill(true);
1301        }
1302    }
1303
1304    fn input_preference(&self) -> OwnershipPreference {
1305        OwnershipPreference::PREFER_OWNED
1306    }
1307}
1308
1309#[cfg(test)]
1310mod test {
1311    use std::{
1312        cmp::max,
1313        collections::{BTreeMap, BTreeSet},
1314        thread,
1315        time::{Duration, Instant},
1316    };
1317
1318    use crate::{
1319        OrdIndexedZSet, Runtime, Stream, TypedBox, ZWeight,
1320        circuit::{
1321            CircuitConfig, GlobalNodeId,
1322            metadata::{MetaItem, SPINE_BATCHES_COUNT},
1323        },
1324        dynamic::DynData,
1325        typed_batch::{self, IndexedZSetReader, Spine},
1326        utils::Tup2,
1327    };
1328    use proptest::{collection::vec, prelude::*};
1329    use size_of::SizeOf;
1330
1331    fn quasi_monotone_batches(
1332        key_window_size: i32,
1333        key_window_step: i32,
1334        val_window_size: i32,
1335        val_window_step: i32,
1336        max_tuples: usize,
1337        batches: usize,
1338    ) -> impl Strategy<Value = Vec<Vec<((i32, i32), ZWeight)>>> {
1339        (0..batches)
1340            .map(|i| {
1341                vec(
1342                    (
1343                        (
1344                            i as i32 * key_window_step
1345                                ..i as i32 * key_window_step + key_window_size,
1346                            i as i32 * val_window_step
1347                                ..i as i32 * val_window_step + val_window_size,
1348                        ),
1349                        1..2i64,
1350                    ),
1351                    0..max_tuples,
1352                )
1353            })
1354            .collect::<Vec<_>>()
1355    }
1356
1357    /// Test that `integrate_trace_retain_XXX` functions don't discard more than
1358    /// they are supposed to.
1359    //
1360    // TODO: this test used to also check that the memory footprint of the collection
1361    // is bounded, but this check was flaky, since GC is lazy.
1362    fn test_integrate_trace_retain(
1363        batches: Vec<Vec<((i32, i32), ZWeight)>>,
1364        key_lateness: i32,
1365        val_lateness: i32,
1366    ) {
1367        let (mut dbsp, input_handle) = Runtime::init_circuit(4, move |circuit| {
1368            let (stream, handle) = circuit.add_input_indexed_zset::<i32, i32>();
1369            let stream = stream.shard();
1370            let watermark: Stream<_, TypedBox<(i32, i32), DynData>> = stream.waterline(
1371                || (i32::MIN, i32::MIN),
1372                |k, v| (*k, *v),
1373                |(ts1_left, ts2_left), (ts1_right, ts2_right)| {
1374                    (max(*ts1_left, *ts1_right), max(*ts2_left, *ts2_right))
1375                },
1376            );
1377
1378            // Track all records in `stream` (this integral is not GC'd).
1379            let trace = stream.integrate_trace();
1380
1381            // Test integrate_trace_retain_keys.
1382            let stream1 = stream.map_index(|(k, v)| (*k, *v)).shard();
1383            let trace1 = stream.integrate_trace();
1384            stream1.integrate_trace_retain_keys(&watermark, move |key, ts| {
1385                *key >= ts.0.saturating_sub(key_lateness)
1386            });
1387
1388            trace1.apply(|trace| {
1389                // println!("retain_keys: {}bytes", trace.size_of().total_bytes());
1390                assert!(trace.size_of().total_bytes() < 100_000);
1391            });
1392
1393            // Test `integrate_trace_retain_values`.
1394            let stream2 = stream.map_index(|(k, v)| (*k, *v)).shard();
1395
1396            let trace2 = stream2.integrate_trace();
1397            stream2.integrate_trace_retain_values(&watermark, move |val, ts| {
1398                *val >= ts.1.saturating_sub(val_lateness)
1399            });
1400
1401            trace2.apply(|trace| {
1402                // println!("retain_vals: {}bytes", trace.size_of().total_bytes());
1403                assert!(trace.size_of().total_bytes() < 100_000);
1404            });
1405
1406            // Test `integrate_trace_retain_values_last_n(1)`.
1407            let stream3 = stream.map_index(|(k, v)| (*k, *v)).shard();
1408
1409            let trace3 = stream3.integrate_trace();
1410            stream3.integrate_trace_retain_values_last_n(
1411                &watermark,
1412                move |val, ts| *val >= ts.1.saturating_sub(val_lateness),
1413                1,
1414            );
1415
1416            // Test `integrate_trace_retain_values_last_n(3)`.
1417            let stream4 = stream.map_index(|(k, v)| (*k, *v)).shard();
1418
1419            let trace4 = stream4.integrate_trace();
1420            stream4.integrate_trace_retain_values_last_n(
1421                &watermark,
1422                move |val, ts| *val >= ts.1.saturating_sub(val_lateness),
1423                3,
1424            );
1425
1426            // Test `integrate_trace_retain_values_top_n(1)`.
1427            let stream5 = stream.map_index(|(k, v)| (*k, *v)).shard();
1428
1429            let trace5 = stream5.integrate_trace();
1430            stream5.integrate_trace_retain_values_top_n(
1431                &watermark,
1432                move |val, ts| *val >= ts.1.saturating_sub(val_lateness),
1433                1,
1434            );
1435
1436            // Test `integrate_trace_retain_values_top_n(3)`.
1437            let stream6 = stream.map_index(|(k, v)| (*k, *v)).shard();
1438
1439            let trace6 = stream6.integrate_trace();
1440            stream6.integrate_trace_retain_values_top_n(
1441                &watermark,
1442                move |val, ts| *val >= ts.1.saturating_sub(val_lateness),
1443                1,
1444            );
1445
1446            // Validate outputs.
1447            trace.apply3(&trace1, &watermark, move |trace, trace1, ts| {
1448                let expected = typed_batch::IndexedZSetReader::iter(trace.as_ref())
1449                    .filter(|(k, _v, _w)| *k >= ts.0.saturating_sub(key_lateness))
1450                    .collect::<BTreeSet<_>>();
1451                let actual =
1452                    typed_batch::IndexedZSetReader::iter(trace1.as_ref()).collect::<BTreeSet<_>>();
1453
1454                let missing = expected.difference(&actual).collect::<Vec<_>>();
1455                assert!(
1456                    missing.is_empty(),
1457                    "missing tuples in trace1: {:?}",
1458                    missing
1459                );
1460            });
1461
1462            trace.apply3(&trace2, &watermark, move |trace, trace2, ts| {
1463                let expected = typed_batch::IndexedZSetReader::iter(trace.as_ref())
1464                    .filter(|(_k, v, _w)| *v >= ts.1.saturating_sub(val_lateness))
1465                    .collect::<BTreeSet<_>>();
1466                let actual =
1467                    typed_batch::IndexedZSetReader::iter(trace2.as_ref()).collect::<BTreeSet<_>>();
1468
1469                let missing = expected.difference(&actual).collect::<Vec<_>>();
1470                assert!(
1471                    missing.is_empty(),
1472                    "missing tuples in trace2: {:?}",
1473                    missing
1474                );
1475            });
1476
1477            fn test_retain_values_last_n(
1478                trace: &Spine<OrdIndexedZSet<i32, i32>>,
1479                watermark: &TypedBox<(i32, i32), DynData>,
1480                val_lateness: i32,
1481                n: usize,
1482            ) -> BTreeSet<(i32, i32, ZWeight)> {
1483                let mut all_tuples = BTreeMap::new();
1484                typed_batch::IndexedZSetReader::iter(trace).for_each(|(k, v, w)| {
1485                    all_tuples
1486                        .entry(k)
1487                        .or_insert_with(|| Vec::new())
1488                        .push((v, w))
1489                });
1490                let mut expected = BTreeSet::new();
1491                for (k, mut tuples) in all_tuples.into_iter() {
1492                    let index = tuples
1493                        .iter()
1494                        .position(|(v, _w)| *v >= watermark.1.saturating_sub(val_lateness))
1495                        .unwrap_or(tuples.len());
1496                    let first_index = index.saturating_sub(n);
1497                    tuples.drain(first_index..).for_each(|(v, w)| {
1498                        let _ = expected.insert((k, v, w));
1499                    });
1500                }
1501                expected
1502            }
1503
1504            fn test_retain_values_top_n(
1505                trace: &Spine<OrdIndexedZSet<i32, i32>>,
1506                watermark: &TypedBox<(i32, i32), DynData>,
1507                val_lateness: i32,
1508                n: usize,
1509            ) -> BTreeSet<(i32, i32, ZWeight)> {
1510                let mut all_tuples = BTreeMap::new();
1511                typed_batch::IndexedZSetReader::iter(trace).for_each(|(k, v, w)| {
1512                    all_tuples
1513                        .entry(k)
1514                        .or_insert_with(|| Vec::new())
1515                        .push((v, w))
1516                });
1517
1518                let mut expected = BTreeSet::new();
1519                for (k, mut tuples) in all_tuples.into_iter() {
1520                    tuples.retain(|(v, _w)| *v >= watermark.1.saturating_sub(val_lateness));
1521                    let first_index = tuples.len().saturating_sub(n);
1522                    tuples.drain(first_index..).for_each(|(v, w)| {
1523                        let _ = expected.insert((k, v, w));
1524                    });
1525                }
1526                expected
1527            }
1528
1529            trace.apply3(&trace3, &watermark, move |trace, trace3, ts| {
1530                let mut all_tuples = BTreeMap::new();
1531                typed_batch::IndexedZSetReader::iter(trace.as_ref()).for_each(|(k, v, w)| {
1532                    all_tuples
1533                        .entry(k)
1534                        .or_insert_with(|| Vec::new())
1535                        .push((v, w))
1536                });
1537
1538                let expected =
1539                    test_retain_values_last_n(trace.as_ref(), ts.as_ref(), val_lateness, 1);
1540
1541                let actual =
1542                    typed_batch::IndexedZSetReader::iter(trace3.as_ref()).collect::<BTreeSet<_>>();
1543
1544                let missing = expected.difference(&actual).collect::<Vec<_>>();
1545                assert!(
1546                    missing.is_empty(),
1547                    "missing tuples in trace3: {:?}",
1548                    missing
1549                );
1550            });
1551
1552            trace.apply3(&trace4, &watermark, move |trace, trace4, ts| {
1553                let mut all_tuples = BTreeMap::new();
1554                typed_batch::IndexedZSetReader::iter(trace.as_ref()).for_each(|(k, v, w)| {
1555                    all_tuples
1556                        .entry(k)
1557                        .or_insert_with(|| Vec::new())
1558                        .push((v, w))
1559                });
1560
1561                let expected =
1562                    test_retain_values_last_n(trace.as_ref(), ts.as_ref(), val_lateness, 3);
1563
1564                let actual =
1565                    typed_batch::IndexedZSetReader::iter(trace4.as_ref()).collect::<BTreeSet<_>>();
1566
1567                let missing = expected.difference(&actual).collect::<Vec<_>>();
1568                assert!(
1569                    missing.is_empty(),
1570                    "missing tuples in trace4: {:?}",
1571                    missing
1572                );
1573            });
1574
1575            trace.apply3(&trace5, &watermark, move |trace, trace5, ts| {
1576                let mut all_tuples = BTreeMap::new();
1577                typed_batch::IndexedZSetReader::iter(trace.as_ref()).for_each(|(k, v, w)| {
1578                    all_tuples
1579                        .entry(k)
1580                        .or_insert_with(|| Vec::new())
1581                        .push((v, w))
1582                });
1583
1584                let expected =
1585                    test_retain_values_top_n(trace.as_ref(), ts.as_ref(), val_lateness, 1);
1586
1587                let actual =
1588                    typed_batch::IndexedZSetReader::iter(trace5.as_ref()).collect::<BTreeSet<_>>();
1589
1590                let missing = expected.difference(&actual).collect::<Vec<_>>();
1591                assert!(
1592                    missing.is_empty(),
1593                    "missing tuples in trace5: {:?}",
1594                    missing
1595                );
1596            });
1597
1598            trace.apply3(&trace6, &watermark, move |trace, trace6, ts| {
1599                let mut all_tuples = BTreeMap::new();
1600                typed_batch::IndexedZSetReader::iter(trace.as_ref()).for_each(|(k, v, w)| {
1601                    all_tuples
1602                        .entry(k)
1603                        .or_insert_with(|| Vec::new())
1604                        .push((v, w))
1605                });
1606
1607                let expected =
1608                    test_retain_values_top_n(trace.as_ref(), ts.as_ref(), val_lateness, 3);
1609
1610                let actual =
1611                    typed_batch::IndexedZSetReader::iter(trace6.as_ref()).collect::<BTreeSet<_>>();
1612
1613                let missing = expected.difference(&actual).collect::<Vec<_>>();
1614                assert!(
1615                    missing.is_empty(),
1616                    "missing tuples in trace6: {:?}",
1617                    missing
1618                );
1619            });
1620
1621            Ok(handle)
1622        })
1623        .unwrap();
1624
1625        for batch in batches {
1626            let mut tuples = batch
1627                .into_iter()
1628                .map(|((k, v), r)| Tup2(k, Tup2(v, r)))
1629                .collect::<Vec<_>>();
1630            input_handle.append(&mut tuples);
1631            dbsp.transaction().unwrap();
1632        }
1633    }
1634
1635    /// Deterministic input that inserts and retracts records in order to simulate a situation where
1636    /// records present in a batch are not present in the trace because they were deleted in a subsequent batch.
1637    /// This triggered a bug in the initial implementation of LastN filters.
1638    #[test]
1639    fn test_integrate_trace_retain_with_deletions() {
1640        let mut batches: Vec<Vec<((i32, i32), ZWeight)>> = vec![];
1641        for i in 0..1000 {
1642            batches.push(vec![
1643                ((i, i), 1), // Keep this value, delete subsequent values
1644                ((i, i + 1), 1),
1645                ((i, i + 2), 1),
1646                ((i, i + 3), 1),
1647                ((i, i + 4), 1),
1648            ]);
1649            batches.push(vec![
1650                ((i, i + 1), -1),
1651                ((i, i + 2), -1),
1652                ((i, i + 3), -1),
1653                ((i, i + 4), -1),
1654            ]);
1655        }
1656
1657        test_integrate_trace_retain(batches, 10, 10);
1658    }
1659
1660    fn spine_batches_count_values(profile: &crate::profile::DbspProfile) -> Vec<(String, usize)> {
1661        let root = GlobalNodeId::root();
1662        let operator_names = spine_operator_names(profile);
1663
1664        // The root profile entry merges child metrics, so only inspect individual operators.
1665        let mut counts = profile
1666            .worker_profiles
1667            .iter()
1668            .flat_map(|profile| profile.attribute_profile(&SPINE_BATCHES_COUNT))
1669            .filter_map(|(node_id, value)| {
1670                (node_id != root).then(|| match value {
1671                    MetaItem::Count(count) => {
1672                        let operator_name = operator_names
1673                            .get(&node_id.node_identifier().to_string())
1674                            .cloned()
1675                            .unwrap_or_else(|| node_id.to_string());
1676                        (operator_name, count)
1677                    }
1678                    value => panic!("spine_batches_count must be serialized as a count: {value:?}"),
1679                })
1680            })
1681            .collect::<Vec<_>>();
1682
1683        counts.sort();
1684        counts
1685    }
1686
1687    fn spine_operator_names(profile: &crate::profile::DbspProfile) -> BTreeMap<String, String> {
1688        let mut operator_names = BTreeMap::new();
1689
1690        if let Some(graph) = &profile.graph {
1691            let graph_json: serde_json::Value = serde_json::from_str(&graph.to_json()).unwrap();
1692            collect_operator_names(&graph_json, &mut operator_names);
1693        }
1694
1695        operator_names
1696    }
1697
1698    fn collect_operator_names(
1699        value: &serde_json::Value,
1700        operator_names: &mut BTreeMap<String, String>,
1701    ) {
1702        match value {
1703            serde_json::Value::Object(object) => {
1704                if let (Some(id), Some(label)) = (
1705                    object.get("id").and_then(serde_json::Value::as_str),
1706                    object.get("label").and_then(serde_json::Value::as_str),
1707                ) {
1708                    let name = label
1709                        .split("\\l")
1710                        .next()
1711                        .unwrap_or(label)
1712                        .split(" @ ")
1713                        .next()
1714                        .unwrap_or(label);
1715                    operator_names.insert(id.to_owned(), name.to_owned());
1716                }
1717
1718                for value in object.values() {
1719                    collect_operator_names(value, operator_names);
1720                }
1721            }
1722            serde_json::Value::Array(array) => {
1723                for value in array {
1724                    collect_operator_names(value, operator_names);
1725                }
1726            }
1727            _ => {}
1728        }
1729    }
1730
1731    #[test]
1732    fn test_start_compaction_accumulate_integrate_trace() {
1733        const NUM_TRACES: usize = 3;
1734        const ITERATIONS: usize = 3;
1735        const BATCHES_PER_ITERATION: usize = 20;
1736        const TOTAL_BATCHES: usize = BATCHES_PER_ITERATION * ITERATIONS * (ITERATIONS + 1) / 2;
1737        const RECORDS_PER_BATCH: usize = 2000;
1738
1739        let storage_dir = tempfile::tempdir().unwrap();
1740        let circuit_config =
1741            CircuitConfig::with_workers(2).with_temporary_storage(storage_dir.path());
1742        let (mut dbsp, (input_handles, trace_outputs)) =
1743            Runtime::init_circuit(circuit_config, move |circuit| {
1744                // Build several independent trace integrals so one compaction request has to visit
1745                // multiple `accumulate_integrate_trace` operators in a storage-enabled circuit.
1746                let mut input_handles = Vec::new();
1747                let mut trace_outputs = Vec::new();
1748
1749                for _ in 0..NUM_TRACES {
1750                    let (stream, handle) = circuit.add_input_indexed_zset::<i32, i32>();
1751                    let trace = stream.shard().accumulate_integrate_trace();
1752                    let output = trace
1753                        .apply(|trace| trace.ro_snapshot().consolidate())
1754                        .output();
1755                    input_handles.push(handle);
1756                    trace_outputs.push(output);
1757                }
1758
1759                Ok((input_handles, trace_outputs))
1760            })
1761            .unwrap();
1762
1763        let mut next_batch = 0;
1764        for iteration in 0..ITERATIONS {
1765            // Each iteration appends more batches than the previous one.  This exercises
1766            // repeated compaction on traces that keep receiving additional input.
1767            let batches_this_iteration = (iteration + 1) * BATCHES_PER_ITERATION;
1768            let end_batch = next_batch + batches_this_iteration;
1769
1770            for batch in next_batch..end_batch {
1771                for (trace, input_handle) in input_handles.iter().enumerate() {
1772                    let mut tuples = Vec::with_capacity(RECORDS_PER_BATCH);
1773                    for record in 0..RECORDS_PER_BATCH {
1774                        // Use disjoint key ranges per trace so the output validation can
1775                        // reconstruct the expected integral exactly.
1776                        let key = (trace * TOTAL_BATCHES * RECORDS_PER_BATCH
1777                            + batch * RECORDS_PER_BATCH
1778                            + record) as i32;
1779                        tuples.push(Tup2(key, Tup2(record as i32, 1)));
1780                    }
1781
1782                    input_handle.append(&mut tuples);
1783                }
1784
1785                dbsp.transaction().unwrap();
1786            }
1787
1788            next_batch = end_batch;
1789
1790            let profile = dbsp.retrieve_profile().unwrap();
1791            let counts = spine_batches_count_values(&profile);
1792            println!(
1793                "SPINE_BATCHES_COUNT values before compaction iteration {iteration}: {counts:?}"
1794            );
1795            assert!(
1796                !counts.is_empty(),
1797                "profile should contain at least one SPINE_BATCHES_COUNT metric"
1798            );
1799
1800            dbsp.start_compaction().unwrap();
1801            thread::sleep(Duration::from_millis(100));
1802            // Starting compaction again while the previous request is still making progress
1803            // should be idempotent.
1804            dbsp.start_compaction().unwrap();
1805
1806            let deadline = Instant::now() + Duration::from_secs(60);
1807            loop {
1808                let profile = dbsp.retrieve_profile().unwrap();
1809                let counts = spine_batches_count_values(&profile);
1810                println!(
1811                    "SPINE_BATCHES_COUNT values after compaction request iteration {iteration}: {counts:?}"
1812                );
1813                assert!(
1814                    !counts.is_empty(),
1815                    "profile should contain at least one SPINE_BATCHES_COUNT metric"
1816                );
1817
1818                if counts.iter().all(|(_, count)| *count <= 1) {
1819                    break;
1820                }
1821
1822                assert!(
1823                    Instant::now() < deadline,
1824                    "timed out waiting for all SPINE_BATCHES_COUNT values to be <= 1: {counts:?}"
1825                );
1826                thread::sleep(Duration::from_secs(5));
1827            }
1828
1829            // After all spines report at most one batch, run one more logical step to
1830            // materialize the trace outputs and verify compaction preserved every record.
1831            dbsp.transaction().unwrap();
1832            for (trace, output) in trace_outputs.iter().enumerate() {
1833                let output = output.consolidate();
1834                let mut actual_tuples = output.iter();
1835
1836                for batch in 0..next_batch {
1837                    for record in 0..RECORDS_PER_BATCH {
1838                        let key = (trace * TOTAL_BATCHES * RECORDS_PER_BATCH
1839                            + batch * RECORDS_PER_BATCH
1840                            + record) as i32;
1841                        assert_eq!(actual_tuples.next(), Some((key, record as i32, 1)));
1842                    }
1843                }
1844
1845                assert_eq!(actual_tuples.next(), None);
1846            }
1847        }
1848    }
1849
1850    proptest! {
1851        #![proptest_config(ProptestConfig::with_cases(16))]
1852
1853        #[test]
1854        fn test_integrate_trace_retain_proptest(batches in quasi_monotone_batches(100, 20, 1000, 200, 100, 200)) {
1855            test_integrate_trace_retain(batches, 100, 1000);
1856        }
1857    }
1858}