1use crate::circuit::circuit_builder::{StreamId, register_replay_stream};
2use crate::circuit::metadata::{INPUT_RECORDS_COUNT, MEMORY_ALLOCATIONS_COUNT, RETAINMENT_BOUNDS};
3use crate::circuit::splitter_output_chunk_size;
4use crate::dynamic::{Factory, Weight, WeightTrait};
5use crate::operator::require_persistent_id;
6use crate::trace::spine_async::WithSnapshot;
7use crate::trace::{BatchReaderFactories, Builder, GroupFilter, MergeCursor};
8use crate::{
9 Error, Timestamp,
10 circuit::{
11 Circuit, ExportId, ExportStream, FeedbackConnector, GlobalNodeId, OwnershipPreference,
12 Scope, Stream, WithClock,
13 metadata::{
14 ALLOCATED_MEMORY_BYTES, MetaItem, OperatorMeta, SHARED_MEMORY_BYTES,
15 STATE_RECORDS_COUNT, USED_MEMORY_BYTES,
16 },
17 operator_traits::{BinaryOperator, Operator, StrictOperator, StrictUnaryOperator},
18 },
19 circuit_cache_key,
20 dynamic::DataTrait,
21 trace::{Batch, BatchReader, Filter, Spine, SpineSnapshot, Trace},
22};
23use dyn_clone::clone_box;
24use feldera_storage::{FileCommitter, StoragePath};
25use ouroboros::self_referencing;
26use size_of::SizeOf;
27use std::any::TypeId;
28use std::collections::BTreeMap;
29use std::mem::transmute;
30use std::{
31 borrow::Cow,
32 cell::{Ref, RefCell},
33 cmp::Ordering,
34 fmt::Debug,
35 marker::PhantomData,
36 ops::Deref,
37 rc::Rc,
38 sync::Arc,
39};
40
41circuit_cache_key!(TraceId<C, D: BatchReader>(StreamId => Stream<C, D>));
42circuit_cache_key!(BoundsId<D: BatchReader>(StreamId => TraceBounds<<D as BatchReader>::Key, <D as BatchReader>::Val>));
43
44circuit_cache_key!(DelayedTraceId<C, D>(StreamId => Stream<C, D>));
46
47#[repr(transparent)]
56pub struct TraceBound<T: ?Sized>(Rc<RefCell<Option<Box<T>>>>);
57
58impl<T: DataTrait + ?Sized> Clone for TraceBound<T> {
59 fn clone(&self) -> Self {
60 Self(self.0.clone())
61 }
62}
63
64impl<T: Debug + ?Sized> Debug for TraceBound<T> {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 self.0.borrow().fmt(f)
67 }
68}
69
70impl<T: DataTrait + ?Sized> PartialEq for TraceBound<T> {
71 fn eq(&self, other: &Self) -> bool {
72 self.0 == other.0
73 }
74}
75
76impl<T: DataTrait + ?Sized> Eq for TraceBound<T> {}
77
78impl<T: DataTrait + ?Sized> PartialOrd for TraceBound<T> {
79 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
80 Some(self.cmp(other))
81 }
82}
83
84impl<T: DataTrait + ?Sized> Ord for TraceBound<T> {
85 fn cmp(&self, other: &Self) -> Ordering {
86 self.0.cmp(&other.0)
87 }
88}
89
90impl<K: DataTrait + ?Sized> Default for TraceBound<K> {
91 fn default() -> Self {
92 Self(Rc::new(RefCell::new(None)))
93 }
94}
95
96impl<K: DataTrait + ?Sized> TraceBound<K> {
97 pub fn new() -> Self {
98 Default::default()
99 }
100
101 pub fn set(&self, bound: Box<K>) {
103 *self.0.borrow_mut() = Some(bound);
105 }
106
107 pub fn get(&self) -> Ref<'_, Option<Box<K>>> {
109 (*self.0).borrow()
110 }
111}
112
113pub struct TraceBounds<K: ?Sized + 'static, V: DataTrait + ?Sized>(
116 Rc<RefCell<TraceBoundsInner<K, V>>>,
117);
118
119impl<K: DataTrait + ?Sized, V: DataTrait + ?Sized> Clone for TraceBounds<K, V> {
120 fn clone(&self) -> Self {
121 Self(self.0.clone())
122 }
123}
124
125impl<K: DataTrait + ?Sized, V: DataTrait + ?Sized> TraceBounds<K, V> {
126 pub(crate) fn new() -> Self {
131 Self(Rc::new(RefCell::new(TraceBoundsInner {
132 key_predicate: Predicate::Bounds(Vec::new()),
133 unique_key_name: None,
134 val_predicate: GroupPredicate::Bounds(Vec::new()),
135 unique_val_name: None,
136 })))
137 }
138
139 pub(crate) fn unbounded() -> Self {
142 Self(Rc::new(RefCell::new(TraceBoundsInner {
143 key_predicate: Predicate::Bounds(vec![TraceBound::new()]),
144 unique_key_name: None,
145 val_predicate: GroupPredicate::Bounds(vec![TraceBound::new()]),
146 unique_val_name: None,
147 })))
148 }
149
150 pub(crate) fn add_key_bound(&self, bound: TraceBound<K>) {
151 match &mut self.0.borrow_mut().key_predicate {
152 Predicate::Bounds(bounds) => bounds.push(bound),
153 Predicate::Filter(_) => {}
154 };
155 }
156
157 pub(crate) fn set_key_filter(&self, filter: Filter<K>) {
160 self.0.borrow_mut().key_predicate = Predicate::Filter(filter);
161 }
162
163 pub(crate) fn set_unique_key_bound_name(&self, unique_name: Option<&str>) {
164 self.0.borrow_mut().unique_key_name = unique_name.map(str::to_string);
165 }
166
167 pub(crate) fn add_val_bound(&self, bound: TraceBound<V>) {
168 match &mut self.0.borrow_mut().val_predicate {
169 GroupPredicate::Bounds(bounds) => bounds.push(bound),
170 GroupPredicate::Filter(_) => {}
171 };
172 }
173
174 pub(crate) fn set_val_filter(&self, filter: GroupFilter<V>) {
177 self.0.borrow_mut().val_predicate = GroupPredicate::Filter(filter);
178 }
179
180 pub(crate) fn set_unique_val_bound_name(&self, unique_name: Option<&str>) {
181 self.0.borrow_mut().unique_val_name = unique_name.map(str::to_string);
182 }
183
184 pub(crate) fn effective_key_filter(&self) -> Option<Filter<K>> {
189 match &(*self.0).borrow().key_predicate {
190 Predicate::Bounds(bounds) => bounds
191 .iter()
192 .min()
193 .expect("At least one trace bound must be set")
194 .get()
195 .deref()
196 .as_ref()
197 .map(|bx| Arc::from(clone_box(bx.as_ref())))
198 .map(|bound: Arc<K>| {
199 let metadata = MetaItem::String(format!("{bound:?}"));
200 Filter::new(Box::new(move |k: &K| {
201 bound.as_ref().cmp(k) != Ordering::Greater
202 }))
203 .with_metadata(metadata)
204 }),
205 Predicate::Filter(filter) => Some(filter.clone()),
206 }
207 }
208
209 pub(crate) fn effective_val_filter(&self) -> Option<GroupFilter<V>> {
214 match &(*self.0).borrow().val_predicate {
215 GroupPredicate::Bounds(bounds) => bounds
216 .iter()
217 .min()
218 .expect("At least one trace bound must be set")
219 .get()
220 .deref()
221 .as_ref()
222 .map(|bx| Arc::from(clone_box(bx.as_ref())))
223 .map(|bound: Arc<V>| {
224 let metadata = MetaItem::String(format!("{bound:?}"));
225 GroupFilter::Simple(
226 Filter::new(Box::new(move |v: &V| {
227 bound.as_ref().cmp(v) != Ordering::Greater
228 }))
229 .with_metadata(metadata),
230 )
231 }),
232 GroupPredicate::Filter(filter) => Some(filter.clone()),
233 }
234 }
235
236 pub(crate) fn metadata(&self) -> MetaItem {
237 self.0.borrow().metadata()
238 }
239}
240
241enum Predicate<V: ?Sized> {
246 Bounds(Vec<TraceBound<V>>),
247 Filter(Filter<V>),
248}
249
250impl<V: Debug + ?Sized> Predicate<V> {
251 pub fn metadata(&self) -> MetaItem {
252 match self {
253 Self::Bounds(bounds) => MetaItem::Array(
254 bounds
255 .iter()
256 .map(|b| MetaItem::String(format!("{b:?}")))
257 .collect(),
258 ),
259 Self::Filter(filter) => filter.metadata().clone(),
260 }
261 }
262}
263
264enum GroupPredicate<V: DataTrait + ?Sized> {
265 Bounds(Vec<TraceBound<V>>),
267 Filter(GroupFilter<V>),
269}
270
271impl<V: DataTrait + ?Sized> GroupPredicate<V> {
272 pub fn metadata(&self) -> MetaItem {
273 match self {
274 Self::Bounds(bounds) => MetaItem::Array(
275 bounds
276 .iter()
277 .map(|b| MetaItem::String(format!("{b:?}")))
278 .collect(),
279 ),
280 Self::Filter(filter) => filter.metadata().clone(),
281 }
282 }
283}
284
285struct TraceBoundsInner<K: ?Sized + 'static, V: DataTrait + ?Sized> {
286 key_predicate: Predicate<K>,
288 unique_key_name: Option<String>,
289 val_predicate: GroupPredicate<V>,
291 unique_val_name: Option<String>,
292}
293
294impl<K: Debug + ?Sized + 'static, V: DataTrait + ?Sized> TraceBoundsInner<K, V> {
295 pub fn metadata(&self) -> MetaItem {
296 MetaItem::Map(BTreeMap::from([
297 (Cow::Borrowed("key"), self.key_predicate.metadata()),
298 (Cow::Borrowed("value"), self.val_predicate.metadata()),
299 ]))
300 }
301}
302
303pub type TimedSpine<B, C> = Spine<<<C as WithClock>::Time as Timestamp>::TimedBatch<B>>;
304
305impl<C, B> Stream<C, B>
306where
307 C: Circuit,
308 B: Clone + Send + Sync + 'static,
309{
310 pub fn dyn_trace(
312 &self,
313 output_factories: &<TimedSpine<B, C> as BatchReader>::Factories,
314 batch_factories: &B::Factories,
315 ) -> Stream<C, TimedSpine<B, C>>
316 where
317 B: Batch<Time = ()>,
318 {
319 self.dyn_trace_with_bound(
320 output_factories,
321 batch_factories,
322 TraceBound::new(),
323 TraceBound::new(),
324 )
325 }
326
327 pub fn dyn_trace_with_bound(
329 &self,
330 output_factories: &<TimedSpine<B, C> as BatchReader>::Factories,
331 batch_factories: &B::Factories,
332 lower_key_bound: TraceBound<B::Key>,
333 lower_val_bound: TraceBound<B::Val>,
334 ) -> Stream<C, TimedSpine<B, C>>
335 where
336 B: Batch<Time = ()>,
337 {
338 let bounds = self.trace_bounds_with_bound(lower_key_bound, lower_val_bound);
339
340 self.circuit()
341 .cache_get_or_insert_with(TraceId::new(self.stream_id()), || {
342 let circuit = self.circuit();
343
344 circuit.region("trace", || {
345 let persistent_id = self.get_persistent_id();
346 let z1 = Z1Trace::new(
347 output_factories,
348 batch_factories,
349 false,
350 circuit.root_scope(),
351 bounds.clone(),
352 );
353 let (delayed_trace, z1feedback) = circuit.add_feedback_persistent(
354 persistent_id
355 .map(|name| format!("{name}.integral"))
356 .as_deref(),
357 z1,
358 );
359
360 let replay_stream = z1feedback.operator_mut().prepare_replay_stream(self);
361
362 let trace = circuit.add_binary_operator_with_preference(
363 <TraceAppend<TimedSpine<B, C>, B, C>>::new(
364 output_factories,
365 circuit.clone(),
366 ),
367 (&delayed_trace, OwnershipPreference::STRONGLY_PREFER_OWNED),
368 (self, OwnershipPreference::PREFER_OWNED),
369 );
370 if self.is_sharded() {
371 delayed_trace.mark_sharded();
372 trace.mark_sharded();
373 }
374
375 z1feedback.connect_with_preference(
376 &trace,
377 OwnershipPreference::STRONGLY_PREFER_OWNED,
378 );
379
380 register_replay_stream(circuit, self, &replay_stream);
381
382 circuit.cache_insert(DelayedTraceId::new(trace.stream_id()), delayed_trace);
383 trace
384 })
385 })
386 .clone()
387 }
388
389 #[track_caller]
391 pub fn dyn_integrate_trace_retain_keys<TS>(
392 &self,
393 bounds_stream: &Stream<C, Box<TS>>,
394 retain_key_func: Box<dyn Fn(&TS) -> Filter<B::Key>>,
395 ) where
396 B: Batch<Time = ()>,
397 TS: DataTrait + ?Sized,
398 Box<TS>: Clone,
399 {
400 let bounds = self.trace_bounds();
401 bounds.set_unique_key_bound_name(bounds_stream.get_persistent_id().as_deref());
402 bounds_stream.inspect(move |ts| {
403 let filter = retain_key_func(ts.as_ref());
404 bounds.set_key_filter(filter);
405 });
406 }
407
408 #[track_caller]
410 pub fn dyn_integrate_trace_retain_values<TS>(
411 &self,
412 bounds_stream: &Stream<C, Box<TS>>,
413 retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>,
414 ) where
415 B: Batch<Time = ()>,
416 TS: DataTrait + ?Sized,
417 Box<TS>: Clone,
418 {
419 let bounds = self.trace_bounds();
420 bounds.set_unique_val_bound_name(bounds_stream.get_persistent_id().as_deref());
421
422 bounds_stream.inspect(move |ts| {
423 let filter = GroupFilter::Simple(retain_val_func(ts.as_ref()));
424 bounds.set_val_filter(filter);
425 });
426 }
427
428 #[track_caller]
429 pub fn dyn_integrate_trace_retain_values_last_n<TS>(
430 &self,
431 bounds_stream: &Stream<C, Box<TS>>,
432 retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>,
433 n: usize,
434 ) where
435 B: Batch<Time = ()>,
436 TS: DataTrait + ?Sized,
437 Box<TS>: Clone,
438 {
439 let bounds = self.trace_bounds();
440 bounds.set_unique_val_bound_name(bounds_stream.get_persistent_id().as_deref());
441
442 bounds_stream.inspect(move |ts| {
443 let filter = GroupFilter::LastN(n, retain_val_func(ts.as_ref()));
444 bounds.set_val_filter(filter);
445 });
446 }
447
448 #[track_caller]
449 pub fn dyn_integrate_trace_retain_values_top_n<TS>(
450 &self,
451 val_factory: &'static dyn Factory<B::Val>,
452 bounds_stream: &Stream<C, Box<TS>>,
453 retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>,
454 n: usize,
455 ) where
456 B: Batch<Time = ()>,
457 TS: DataTrait + ?Sized,
458 Box<TS>: Clone,
459 {
460 let bounds = self.trace_bounds();
461 bounds.set_unique_val_bound_name(bounds_stream.get_persistent_id().as_deref());
462
463 bounds_stream.inspect(move |ts| {
464 let filter = GroupFilter::TopN(n, retain_val_func(ts.as_ref()), val_factory);
465 bounds.set_val_filter(filter);
466 });
467 }
468
469 fn trace_bounds(&self) -> TraceBounds<B::Key, B::Val>
485 where
486 B: BatchReader,
487 {
488 let stream_id = self.try_unsharded_version().stream_id();
490
491 self.circuit()
492 .cache_get_or_insert_with(BoundsId::<B>::new(stream_id), TraceBounds::new)
493 .clone()
494 }
495
496 fn trace_bounds_with_bound(
500 &self,
501 lower_key_bound: TraceBound<B::Key>,
502 lower_val_bound: TraceBound<B::Val>,
503 ) -> TraceBounds<B::Key, B::Val>
504 where
505 B: BatchReader,
506 {
507 let bounds = self.trace_bounds();
508 bounds.add_key_bound(lower_key_bound);
509 bounds.add_val_bound(lower_val_bound);
510 bounds
511 }
512
513 #[track_caller]
515 pub fn dyn_integrate_trace(&self, factories: &B::Factories) -> Stream<C, Spine<B>>
516 where
517 B: Batch<Time = ()>,
518 Spine<B>: SizeOf,
519 {
520 self.dyn_integrate_trace_with_bound(factories, TraceBound::new(), TraceBound::new())
521 }
522
523 pub fn dyn_integrate_trace_with_bound(
524 &self,
525 factories: &B::Factories,
526 lower_key_bound: TraceBound<B::Key>,
527 lower_val_bound: TraceBound<B::Val>,
528 ) -> Stream<C, Spine<B>>
529 where
530 B: Batch<Time = ()>,
531 Spine<B>: SizeOf,
532 {
533 self.integrate_trace_inner(
534 factories,
535 self.trace_bounds_with_bound(lower_key_bound, lower_val_bound),
536 )
537 }
538
539 #[allow(clippy::type_complexity)]
540 fn integrate_trace_inner(
541 &self,
542 input_factories: &B::Factories,
543 bounds: TraceBounds<B::Key, B::Val>,
544 ) -> Stream<C, Spine<B>>
545 where
546 B: Batch<Time = ()>,
547 Spine<B>: SizeOf,
548 {
549 self.circuit()
550 .cache_get_or_insert_with(TraceId::new(self.stream_id()), || {
551 let circuit = self.circuit();
552 let bounds = bounds.clone();
553
554 let persistent_id = self.get_persistent_id();
555
556 circuit.region("integrate_trace", || {
557 let z1 = Z1Trace::new(
558 input_factories,
559 input_factories,
560 true,
561 circuit.root_scope(),
562 bounds,
563 );
564
565 let (
566 ExportStream {
567 local: delayed_trace,
568 export,
569 },
570 z1feedback,
571 ) = circuit.add_feedback_with_export_persistent(
572 persistent_id
573 .map(|name| format!("{name}.integral"))
574 .as_deref(),
575 z1,
576 );
577
578 let replay_stream = z1feedback.operator_mut().prepare_replay_stream(self);
579
580 let trace = circuit.add_binary_operator_with_preference(
581 UntimedTraceAppend::<Spine<B>>::new(),
582 (&delayed_trace, OwnershipPreference::STRONGLY_PREFER_OWNED),
583 (self, OwnershipPreference::PREFER_OWNED),
584 );
585
586 if self.is_sharded() {
587 delayed_trace.mark_sharded();
588 trace.mark_sharded();
589 }
590
591 z1feedback.connect_with_preference(
592 &trace,
593 OwnershipPreference::STRONGLY_PREFER_OWNED,
594 );
595
596 register_replay_stream(circuit, self, &replay_stream);
597
598 circuit.cache_insert(DelayedTraceId::new(trace.stream_id()), delayed_trace);
599 circuit.cache_insert(ExportId::new(trace.stream_id()), export);
600
601 trace
602 })
603 })
604 .clone()
605 }
606}
607
608pub struct TraceFeedbackConnector<C, T>
610where
611 C: Circuit,
612 T: Trace,
613{
614 feedback: FeedbackConnector<C, T, T, Z1Trace<C, T::Batch, T>>,
615 pub delayed_trace: Stream<C, T>,
618 export_trace: Stream<C::Parent, T>,
619 bounds: TraceBounds<T::Key, T::Val>,
620}
621
622impl<C, T> TraceFeedbackConnector<C, T>
623where
624 T: Trace<Time = ()> + Clone,
625 C: Circuit,
626{
627 pub fn connect(self, stream: &Stream<C, T::Batch>) {
628 let circuit = self.delayed_trace.circuit();
629
630 let replay_stream = self.feedback.operator_mut().prepare_replay_stream(stream);
631
632 let trace = circuit.add_binary_operator_with_preference(
633 <UntimedTraceAppend<T>>::new(),
634 (
635 &self.delayed_trace,
636 OwnershipPreference::STRONGLY_PREFER_OWNED,
637 ),
638 (stream, OwnershipPreference::PREFER_OWNED),
639 );
640
641 if stream.is_sharded() {
642 self.delayed_trace.mark_sharded();
643 trace.mark_sharded();
644 }
645
646 self.feedback
647 .connect_with_preference(&trace, OwnershipPreference::STRONGLY_PREFER_OWNED);
648
649 register_replay_stream(circuit, stream, &replay_stream);
650
651 circuit.cache_insert(
652 DelayedTraceId::new(trace.stream_id()),
653 self.delayed_trace.clone(),
654 );
655
656 circuit.cache_insert(TraceId::new(stream.stream_id()), trace.clone());
657 circuit.cache_insert(
658 BoundsId::<T::Batch>::new(stream.stream_id()),
659 self.bounds.clone(),
660 );
661 circuit.cache_insert(ExportId::new(trace.stream_id()), self.export_trace);
662 }
663}
664
665pub trait TraceFeedback: Circuit {
687 fn add_integrate_trace_feedback<T>(
688 &self,
689 persistent_id: Option<&str>,
690 factories: &T::Factories,
691 bounds: TraceBounds<T::Key, T::Val>,
692 ) -> TraceFeedbackConnector<Self, T>
693 where
694 T: Trace<Time = ()> + Clone,
695 {
696 let (ExportStream { local, export }, feedback) = self.add_feedback_with_export_persistent(
698 persistent_id
699 .map(|name| format!("{name}.integral"))
700 .as_deref(),
701 Z1Trace::new(
702 factories,
703 factories,
704 true,
705 self.root_scope(),
706 bounds.clone(),
707 ),
708 );
709
710 TraceFeedbackConnector {
711 feedback,
712 delayed_trace: local,
713 export_trace: export,
714 bounds,
715 }
716 }
717}
718
719impl<C: Circuit> TraceFeedback for C {}
720
721impl<C, B> Stream<C, Spine<B>>
722where
723 C: Circuit,
724 B: Batch,
725{
726 pub fn delay_trace(&self) -> Stream<C, SpineSnapshot<B>> {
727 let delayed_trace = self
731 .circuit()
732 .cache_get_or_insert_with(DelayedTraceId::new(self.stream_id()), || {
733 panic!("called `.delay_trace()` on a stream without a previously created trace")
734 })
735 .deref()
736 .clone();
737 delayed_trace.apply(|spine: &Spine<B>| spine.ro_snapshot())
738 }
739}
740
741pub struct UntimedTraceAppend<T>
742where
743 T: Trace,
744{
745 num_inputs: usize,
747
748 _phantom: PhantomData<T>,
749}
750
751impl<T> Default for UntimedTraceAppend<T>
752where
753 T: Trace,
754{
755 fn default() -> Self {
756 Self::new()
757 }
758}
759
760impl<T> UntimedTraceAppend<T>
761where
762 T: Trace,
763{
764 pub fn new() -> Self {
765 Self {
766 num_inputs: 0,
767 _phantom: PhantomData,
768 }
769 }
770}
771
772impl<T> Operator for UntimedTraceAppend<T>
773where
774 T: Trace + 'static,
775{
776 fn name(&self) -> Cow<'static, str> {
777 Cow::from("UntimedTraceAppend")
778 }
779
780 fn metadata(&self, meta: &mut OperatorMeta) {
781 meta.extend(metadata! {
782 INPUT_RECORDS_COUNT => MetaItem::Count(self.num_inputs),
783 });
784 }
785
786 fn fixedpoint(&self, _scope: Scope) -> bool {
787 true
788 }
789}
790
791impl<T> BinaryOperator<T, T::Batch, T> for UntimedTraceAppend<T>
792where
793 T: Trace + 'static,
794{
795 async fn eval(&mut self, _trace: &T, _batch: &T::Batch) -> T {
796 panic!("UntimedTraceAppend::eval(): cannot accept trace by reference")
799 }
800
801 async fn eval_owned_and_ref(&mut self, mut trace: T, batch: &T::Batch) -> T {
802 self.num_inputs += batch.len();
803 trace.insert(batch.clone()).await;
804 trace
805 }
806
807 async fn eval_ref_and_owned(&mut self, _trace: &T, _batch: T::Batch) -> T {
808 panic!("UntimedTraceAppend::eval_ref_and_owned(): cannot accept trace by reference")
811 }
812
813 async fn eval_owned(&mut self, mut trace: T, batch: T::Batch) -> T {
814 self.num_inputs += batch.len();
815
816 trace.insert(batch).await;
817 trace
818 }
819
820 fn input_preference(&self) -> (OwnershipPreference, OwnershipPreference) {
821 (
822 OwnershipPreference::PREFER_OWNED,
823 OwnershipPreference::PREFER_OWNED,
824 )
825 }
826}
827
828pub struct TraceAppend<T: Trace, B: BatchReader, C> {
829 clock: C,
830 output_factories: T::Factories,
831
832 num_inputs: usize,
834
835 _phantom: PhantomData<(T, B)>,
836}
837
838impl<T: Trace, B: BatchReader, C> TraceAppend<T, B, C> {
839 pub fn new(output_factories: &T::Factories, clock: C) -> Self {
840 Self {
841 clock,
842 output_factories: output_factories.clone(),
843 num_inputs: 0,
844 _phantom: PhantomData,
845 }
846 }
847}
848
849impl<T, B, Clk> Operator for TraceAppend<T, B, Clk>
850where
851 T: Trace,
852 B: BatchReader,
853 Clk: 'static,
854{
855 fn name(&self) -> Cow<'static, str> {
856 Cow::from("TraceAppend")
857 }
858 fn fixedpoint(&self, _scope: Scope) -> bool {
859 true
860 }
861
862 fn metadata(&self, meta: &mut OperatorMeta) {
863 meta.extend(metadata! {
864 INPUT_RECORDS_COUNT => MetaItem::Count(self.num_inputs),
865 });
866 }
867}
868
869impl<T, B, Clk> BinaryOperator<T, B, T> for TraceAppend<T, B, Clk>
870where
871 B: BatchReader<Time = ()>,
872 Clk: WithClock + 'static,
873 T: Trace<Key = B::Key, Val = B::Val, R = B::R, Time = Clk::Time>,
874{
875 async fn eval(&mut self, _trace: &T, _batch: &B) -> T {
876 unimplemented!()
879 }
880
881 async fn eval_owned_and_ref(&mut self, mut trace: T, batch: &B) -> T {
882 self.num_inputs += batch.len();
885 trace
886 .insert(T::Batch::from_batch(
887 batch,
888 &self.clock.time(),
889 &self.output_factories,
890 ))
891 .await;
892 trace
893 }
894
895 async fn eval_ref_and_owned(&mut self, _trace: &T, _batch: B) -> T {
896 unimplemented!()
899 }
900
901 async fn eval_owned(&mut self, mut trace: T, batch: B) -> T {
902 self.num_inputs += batch.len();
903
904 if TypeId::of::<B>() == TypeId::of::<T::Batch>() {
905 let mut batch = Some(batch);
906 let batch = unsafe { transmute::<&mut Option<B>, &mut Option<T::Batch>>(&mut batch) };
907 trace.insert(batch.take().unwrap()).await;
908 } else {
909 trace
910 .insert(T::Batch::from_batch(
911 &batch,
912 &self.clock.time(),
913 &self.output_factories,
914 ))
915 .await;
916 }
917 trace
918 }
919
920 fn input_preference(&self) -> (OwnershipPreference, OwnershipPreference) {
921 (
922 OwnershipPreference::PREFER_OWNED,
923 OwnershipPreference::PREFER_OWNED,
924 )
925 }
926}
927
928#[self_referencing]
929struct ReplayState<T: Trace> {
930 trace: T,
931 #[borrows(trace)]
932 #[covariant]
933 cursor: Box<dyn MergeCursor<T::Key, T::Val, T::Time, T::R> + Send + 'this>,
934}
935
936impl<T: Trace> ReplayState<T> {
937 fn create(trace: T) -> Self {
938 ReplayStateBuilder {
939 trace,
940 cursor_builder: |trace| trace.merge_cursor(None, None),
941 }
942 .build()
943 }
944}
945
946pub struct Z1Trace<C: Circuit, B: Batch, T: Trace> {
947 global_id: GlobalNodeId,
949 time: T::Time,
950 trace: Option<T>,
951 replay_state: Option<ReplayState<T>>,
952 replay_mode: bool,
953 trace_factories: T::Factories,
954 dirty: Vec<bool>,
957 root_scope: Scope,
958 reset_on_clock_start: bool,
959 bounds: TraceBounds<T::Key, T::Val>,
960
961 batch_factories: B::Factories,
963 delta_stream: Option<Stream<C, B>>,
965 flush_output: bool,
966 flush_input: bool,
967}
968
969impl<C, B, T> Z1Trace<C, B, T>
970where
971 C: Circuit,
972 B: Batch,
973 T: Trace,
974{
975 pub fn new(
976 trace_factories: &T::Factories,
977 batch_factories: &B::Factories,
978 reset_on_clock_start: bool,
979 root_scope: Scope,
980 bounds: TraceBounds<T::Key, T::Val>,
981 ) -> Self {
982 Self {
983 global_id: GlobalNodeId::root(),
984 time: <T::Time as Timestamp>::clock_start(),
985 trace: None,
986 replay_state: None,
987 replay_mode: false,
988 trace_factories: trace_factories.clone(),
989 batch_factories: batch_factories.clone(),
990 dirty: vec![false; root_scope as usize + 1],
991 root_scope,
992 reset_on_clock_start,
993 bounds,
994 delta_stream: None,
995 flush_output: false,
996 flush_input: false,
997 }
998 }
999
1000 pub fn prepare_replay_stream(&mut self, stream: &Stream<C, B>) -> Stream<C, B> {
1026 let replay_stream = Stream::with_value(
1027 stream.circuit().clone(),
1028 self.global_id.local_node_id().unwrap(),
1029 stream.value(),
1030 );
1031
1032 self.delta_stream = Some(replay_stream.clone());
1033 replay_stream
1034 }
1035}
1036
1037impl<C, B, T> Operator for Z1Trace<C, B, T>
1038where
1039 C: Circuit,
1040 B: Batch,
1041 T: Trace,
1042{
1043 fn name(&self) -> Cow<'static, str> {
1044 Cow::from("Z1 (trace)")
1045 }
1046
1047 fn clock_start(&mut self, scope: Scope) {
1048 self.dirty[scope as usize] = false;
1049
1050 if scope == 0 && self.trace.is_none() {
1051 self.trace = Some(T::new(&self.trace_factories));
1052 }
1053 }
1054
1055 fn clock_end(&mut self, scope: Scope) {
1056 if scope + 1 == self.root_scope
1057 && !self.reset_on_clock_start
1058 && let Some(tr) = self.trace.as_mut()
1059 {
1060 tr.set_frontier(&self.time.epoch_start(scope));
1061 }
1062 self.time = self.time.advance(scope + 1);
1063 }
1064
1065 fn init(&mut self, global_id: &GlobalNodeId) {
1066 self.global_id = global_id.clone();
1067 }
1068
1069 fn metadata(&self, meta: &mut OperatorMeta) {
1070 let total_size = self
1071 .trace
1072 .as_ref()
1073 .map(|trace| trace.num_entries_deep())
1074 .unwrap_or(0);
1075
1076 let bytes = self
1077 .trace
1078 .as_ref()
1079 .map(|trace| trace.size_of())
1080 .unwrap_or_default();
1081
1082 meta.extend(metadata! {
1083 STATE_RECORDS_COUNT => MetaItem::Count(total_size),
1084 ALLOCATED_MEMORY_BYTES => MetaItem::bytes(bytes.total_bytes()),
1085 USED_MEMORY_BYTES => MetaItem::bytes(bytes.used_bytes()),
1086 MEMORY_ALLOCATIONS_COUNT => MetaItem::Count(bytes.distinct_allocations()),
1087 SHARED_MEMORY_BYTES => MetaItem::bytes(bytes.shared_bytes()),
1088 RETAINMENT_BOUNDS => self.bounds.metadata()
1089 });
1090
1091 if let Some(trace) = self.trace.as_ref() {
1092 trace.metadata(meta);
1093 }
1094 }
1095
1096 fn fixedpoint(&self, scope: Scope) -> bool {
1097 !self.dirty[scope as usize] && self.replay_state.is_none()
1098 }
1099
1100 fn checkpoint(
1101 &mut self,
1102 base: &StoragePath,
1103 pid: Option<&str>,
1104 files: &mut Vec<Arc<dyn FileCommitter>>,
1105 ) -> Result<(), Error> {
1106 let pid = require_persistent_id(pid, &self.global_id)?;
1107 self.trace
1108 .as_mut()
1109 .map(|trace| trace.save(base, pid, files))
1110 .unwrap_or(Ok(()))
1111 }
1112
1113 fn restore(&mut self, base: &StoragePath, pid: Option<&str>) -> Result<(), Error> {
1114 let pid = require_persistent_id(pid, &self.global_id)?;
1115
1116 self.trace
1117 .as_mut()
1118 .map(|trace| trace.restore(base, pid))
1119 .unwrap_or(Ok(()))
1120 }
1121
1122 fn clear_state(&mut self) -> Result<(), Error> {
1123 self.trace = Some(T::new(&self.trace_factories));
1125 self.replay_state = None;
1126 self.dirty = vec![false; self.root_scope as usize + 1];
1127
1128 Ok(())
1129 }
1130
1131 fn start_replay(&mut self) -> Result<(), Error> {
1132 self.replay_mode = true;
1140 if self.delta_stream.is_some() && self.replay_state.is_none() {
1141 let trace = self.trace.take().expect("Z1Trace::start_replay: no trace");
1142 self.trace = Some(T::new(&self.trace_factories));
1143
1144 self.replay_state = Some(ReplayState::create(trace));
1147 }
1148
1149 Ok(())
1150 }
1151
1152 fn is_replay_complete(&self) -> bool {
1153 self.replay_state.is_none()
1154 }
1155
1156 fn end_replay(&mut self) -> Result<(), Error> {
1157 self.replay_mode = false;
1159 self.replay_state = None;
1160
1161 Ok(())
1162 }
1163
1164 fn flush(&mut self) {
1165 self.flush_output = true;
1166 }
1167
1168 fn is_flush_complete(&self) -> bool {
1169 !self.flush_output
1170 }
1171
1172 fn start_compaction(&mut self) {
1173 if let Some(trace) = self.trace.as_mut() {
1174 trace.initiate_compaction()
1175 }
1176 }
1177}
1178
1179impl<C, B, T> StrictOperator<T> for Z1Trace<C, B, T>
1180where
1181 C: Circuit,
1182 B: Batch<Key = T::Key, Val = T::Val, Time = (), R = T::R>,
1183 T: Trace,
1184{
1185 fn get_output(&mut self) -> T {
1186 let replay_step_size = splitter_output_chunk_size();
1188
1189 if self.replay_mode {
1190 if self.flush_output
1192 && let Some(replay) = &mut self.replay_state
1193 {
1194 let mut builder = <B::Builder as Builder<B>>::with_capacity(
1196 &self.batch_factories,
1197 replay_step_size,
1198 replay_step_size,
1199 );
1200
1201 let mut num_values = 0;
1202 let mut weight = self.batch_factories.weight_factory().default_box();
1203
1204 while replay.borrow_cursor().key_valid() && num_values < replay_step_size {
1205 let mut values_added = false;
1206 while replay.borrow_cursor().val_valid() && num_values < replay_step_size {
1207 weight.set_zero();
1208 replay.with_cursor_mut(|cursor| {
1209 cursor.map_times(&mut |_t, w| weight.add_assign(w))
1210 });
1211
1212 if !weight.is_zero() {
1213 builder.push_val_diff(replay.borrow_cursor().val(), weight.as_ref());
1214 values_added = true;
1215 num_values += 1;
1216 }
1217 replay.with_cursor_mut(|cursor| cursor.step_val());
1218 }
1219 if values_added {
1220 builder.push_key(replay.borrow_cursor().key());
1221 }
1222 if !replay.borrow_cursor().val_valid() {
1223 replay.with_cursor_mut(|cursor| cursor.step_key());
1224 }
1225 }
1226
1227 let batch = builder.done();
1228 self.delta_stream.as_ref().unwrap().value().put(batch);
1229 if !replay.borrow_cursor().key_valid() {
1230 self.replay_state = None;
1231 self.flush_output = false;
1232 }
1233 } else {
1234 self.delta_stream
1236 .as_ref()
1237 .unwrap()
1238 .value()
1239 .put(B::dyn_empty(&self.batch_factories));
1240 self.flush_output = false;
1241 }
1242 } else {
1243 self.flush_output = false;
1244 }
1245
1246 let mut result = self.trace.take().unwrap();
1247 result.clear_dirty_flag();
1248 result
1249 }
1250
1251 fn get_final_output(&mut self) -> T {
1252 assert!(self.reset_on_clock_start);
1256 self.get_output()
1257 }
1258}
1259
1260impl<C, B, T> StrictUnaryOperator<T, T> for Z1Trace<C, B, T>
1261where
1262 C: Circuit,
1263 B: Batch<Key = T::Key, Val = T::Val, Time = (), R = T::R>,
1264 T: Trace,
1265{
1266 fn flush_input(&mut self) {
1267 self.flush_input = true;
1268 }
1269
1270 fn is_flush_input_complete(&self) -> bool {
1271 !self.flush_input
1272 }
1273
1274 async fn eval_strict(&mut self, _i: &T) {
1275 unimplemented!()
1276 }
1277
1278 async fn eval_strict_owned(&mut self, mut i: T) {
1279 if self.flush_input {
1282 self.time = self.time.advance(0);
1283 self.flush_input = false;
1284 }
1285
1286 let dirty = i.dirty();
1287
1288 if let Some(filter) = self.bounds.effective_key_filter() {
1289 i.retain_keys(filter);
1290 }
1291
1292 if let Some(filter) = self.bounds.effective_val_filter() {
1293 i.retain_values(filter);
1294 }
1295
1296 self.trace = Some(i);
1297
1298 self.dirty[0] = dirty;
1299 if dirty {
1300 self.dirty.fill(true);
1301 }
1302 }
1303
1304 fn input_preference(&self) -> OwnershipPreference {
1305 OwnershipPreference::PREFER_OWNED
1306 }
1307}
1308
1309#[cfg(test)]
1310mod test {
1311 use std::{
1312 cmp::max,
1313 collections::{BTreeMap, BTreeSet},
1314 thread,
1315 time::{Duration, Instant},
1316 };
1317
1318 use crate::{
1319 OrdIndexedZSet, Runtime, Stream, TypedBox, ZWeight,
1320 circuit::{
1321 CircuitConfig, GlobalNodeId,
1322 metadata::{MetaItem, SPINE_BATCHES_COUNT},
1323 },
1324 dynamic::DynData,
1325 typed_batch::{self, IndexedZSetReader, Spine},
1326 utils::Tup2,
1327 };
1328 use proptest::{collection::vec, prelude::*};
1329 use size_of::SizeOf;
1330
1331 fn quasi_monotone_batches(
1332 key_window_size: i32,
1333 key_window_step: i32,
1334 val_window_size: i32,
1335 val_window_step: i32,
1336 max_tuples: usize,
1337 batches: usize,
1338 ) -> impl Strategy<Value = Vec<Vec<((i32, i32), ZWeight)>>> {
1339 (0..batches)
1340 .map(|i| {
1341 vec(
1342 (
1343 (
1344 i as i32 * key_window_step
1345 ..i as i32 * key_window_step + key_window_size,
1346 i as i32 * val_window_step
1347 ..i as i32 * val_window_step + val_window_size,
1348 ),
1349 1..2i64,
1350 ),
1351 0..max_tuples,
1352 )
1353 })
1354 .collect::<Vec<_>>()
1355 }
1356
1357 fn test_integrate_trace_retain(
1363 batches: Vec<Vec<((i32, i32), ZWeight)>>,
1364 key_lateness: i32,
1365 val_lateness: i32,
1366 ) {
1367 let (mut dbsp, input_handle) = Runtime::init_circuit(4, move |circuit| {
1368 let (stream, handle) = circuit.add_input_indexed_zset::<i32, i32>();
1369 let stream = stream.shard();
1370 let watermark: Stream<_, TypedBox<(i32, i32), DynData>> = stream.waterline(
1371 || (i32::MIN, i32::MIN),
1372 |k, v| (*k, *v),
1373 |(ts1_left, ts2_left), (ts1_right, ts2_right)| {
1374 (max(*ts1_left, *ts1_right), max(*ts2_left, *ts2_right))
1375 },
1376 );
1377
1378 let trace = stream.integrate_trace();
1380
1381 let stream1 = stream.map_index(|(k, v)| (*k, *v)).shard();
1383 let trace1 = stream.integrate_trace();
1384 stream1.integrate_trace_retain_keys(&watermark, move |key, ts| {
1385 *key >= ts.0.saturating_sub(key_lateness)
1386 });
1387
1388 trace1.apply(|trace| {
1389 assert!(trace.size_of().total_bytes() < 100_000);
1391 });
1392
1393 let stream2 = stream.map_index(|(k, v)| (*k, *v)).shard();
1395
1396 let trace2 = stream2.integrate_trace();
1397 stream2.integrate_trace_retain_values(&watermark, move |val, ts| {
1398 *val >= ts.1.saturating_sub(val_lateness)
1399 });
1400
1401 trace2.apply(|trace| {
1402 assert!(trace.size_of().total_bytes() < 100_000);
1404 });
1405
1406 let stream3 = stream.map_index(|(k, v)| (*k, *v)).shard();
1408
1409 let trace3 = stream3.integrate_trace();
1410 stream3.integrate_trace_retain_values_last_n(
1411 &watermark,
1412 move |val, ts| *val >= ts.1.saturating_sub(val_lateness),
1413 1,
1414 );
1415
1416 let stream4 = stream.map_index(|(k, v)| (*k, *v)).shard();
1418
1419 let trace4 = stream4.integrate_trace();
1420 stream4.integrate_trace_retain_values_last_n(
1421 &watermark,
1422 move |val, ts| *val >= ts.1.saturating_sub(val_lateness),
1423 3,
1424 );
1425
1426 let stream5 = stream.map_index(|(k, v)| (*k, *v)).shard();
1428
1429 let trace5 = stream5.integrate_trace();
1430 stream5.integrate_trace_retain_values_top_n(
1431 &watermark,
1432 move |val, ts| *val >= ts.1.saturating_sub(val_lateness),
1433 1,
1434 );
1435
1436 let stream6 = stream.map_index(|(k, v)| (*k, *v)).shard();
1438
1439 let trace6 = stream6.integrate_trace();
1440 stream6.integrate_trace_retain_values_top_n(
1441 &watermark,
1442 move |val, ts| *val >= ts.1.saturating_sub(val_lateness),
1443 1,
1444 );
1445
1446 trace.apply3(&trace1, &watermark, move |trace, trace1, ts| {
1448 let expected = typed_batch::IndexedZSetReader::iter(trace.as_ref())
1449 .filter(|(k, _v, _w)| *k >= ts.0.saturating_sub(key_lateness))
1450 .collect::<BTreeSet<_>>();
1451 let actual =
1452 typed_batch::IndexedZSetReader::iter(trace1.as_ref()).collect::<BTreeSet<_>>();
1453
1454 let missing = expected.difference(&actual).collect::<Vec<_>>();
1455 assert!(
1456 missing.is_empty(),
1457 "missing tuples in trace1: {:?}",
1458 missing
1459 );
1460 });
1461
1462 trace.apply3(&trace2, &watermark, move |trace, trace2, ts| {
1463 let expected = typed_batch::IndexedZSetReader::iter(trace.as_ref())
1464 .filter(|(_k, v, _w)| *v >= ts.1.saturating_sub(val_lateness))
1465 .collect::<BTreeSet<_>>();
1466 let actual =
1467 typed_batch::IndexedZSetReader::iter(trace2.as_ref()).collect::<BTreeSet<_>>();
1468
1469 let missing = expected.difference(&actual).collect::<Vec<_>>();
1470 assert!(
1471 missing.is_empty(),
1472 "missing tuples in trace2: {:?}",
1473 missing
1474 );
1475 });
1476
1477 fn test_retain_values_last_n(
1478 trace: &Spine<OrdIndexedZSet<i32, i32>>,
1479 watermark: &TypedBox<(i32, i32), DynData>,
1480 val_lateness: i32,
1481 n: usize,
1482 ) -> BTreeSet<(i32, i32, ZWeight)> {
1483 let mut all_tuples = BTreeMap::new();
1484 typed_batch::IndexedZSetReader::iter(trace).for_each(|(k, v, w)| {
1485 all_tuples
1486 .entry(k)
1487 .or_insert_with(|| Vec::new())
1488 .push((v, w))
1489 });
1490 let mut expected = BTreeSet::new();
1491 for (k, mut tuples) in all_tuples.into_iter() {
1492 let index = tuples
1493 .iter()
1494 .position(|(v, _w)| *v >= watermark.1.saturating_sub(val_lateness))
1495 .unwrap_or(tuples.len());
1496 let first_index = index.saturating_sub(n);
1497 tuples.drain(first_index..).for_each(|(v, w)| {
1498 let _ = expected.insert((k, v, w));
1499 });
1500 }
1501 expected
1502 }
1503
1504 fn test_retain_values_top_n(
1505 trace: &Spine<OrdIndexedZSet<i32, i32>>,
1506 watermark: &TypedBox<(i32, i32), DynData>,
1507 val_lateness: i32,
1508 n: usize,
1509 ) -> BTreeSet<(i32, i32, ZWeight)> {
1510 let mut all_tuples = BTreeMap::new();
1511 typed_batch::IndexedZSetReader::iter(trace).for_each(|(k, v, w)| {
1512 all_tuples
1513 .entry(k)
1514 .or_insert_with(|| Vec::new())
1515 .push((v, w))
1516 });
1517
1518 let mut expected = BTreeSet::new();
1519 for (k, mut tuples) in all_tuples.into_iter() {
1520 tuples.retain(|(v, _w)| *v >= watermark.1.saturating_sub(val_lateness));
1521 let first_index = tuples.len().saturating_sub(n);
1522 tuples.drain(first_index..).for_each(|(v, w)| {
1523 let _ = expected.insert((k, v, w));
1524 });
1525 }
1526 expected
1527 }
1528
1529 trace.apply3(&trace3, &watermark, move |trace, trace3, ts| {
1530 let mut all_tuples = BTreeMap::new();
1531 typed_batch::IndexedZSetReader::iter(trace.as_ref()).for_each(|(k, v, w)| {
1532 all_tuples
1533 .entry(k)
1534 .or_insert_with(|| Vec::new())
1535 .push((v, w))
1536 });
1537
1538 let expected =
1539 test_retain_values_last_n(trace.as_ref(), ts.as_ref(), val_lateness, 1);
1540
1541 let actual =
1542 typed_batch::IndexedZSetReader::iter(trace3.as_ref()).collect::<BTreeSet<_>>();
1543
1544 let missing = expected.difference(&actual).collect::<Vec<_>>();
1545 assert!(
1546 missing.is_empty(),
1547 "missing tuples in trace3: {:?}",
1548 missing
1549 );
1550 });
1551
1552 trace.apply3(&trace4, &watermark, move |trace, trace4, ts| {
1553 let mut all_tuples = BTreeMap::new();
1554 typed_batch::IndexedZSetReader::iter(trace.as_ref()).for_each(|(k, v, w)| {
1555 all_tuples
1556 .entry(k)
1557 .or_insert_with(|| Vec::new())
1558 .push((v, w))
1559 });
1560
1561 let expected =
1562 test_retain_values_last_n(trace.as_ref(), ts.as_ref(), val_lateness, 3);
1563
1564 let actual =
1565 typed_batch::IndexedZSetReader::iter(trace4.as_ref()).collect::<BTreeSet<_>>();
1566
1567 let missing = expected.difference(&actual).collect::<Vec<_>>();
1568 assert!(
1569 missing.is_empty(),
1570 "missing tuples in trace4: {:?}",
1571 missing
1572 );
1573 });
1574
1575 trace.apply3(&trace5, &watermark, move |trace, trace5, ts| {
1576 let mut all_tuples = BTreeMap::new();
1577 typed_batch::IndexedZSetReader::iter(trace.as_ref()).for_each(|(k, v, w)| {
1578 all_tuples
1579 .entry(k)
1580 .or_insert_with(|| Vec::new())
1581 .push((v, w))
1582 });
1583
1584 let expected =
1585 test_retain_values_top_n(trace.as_ref(), ts.as_ref(), val_lateness, 1);
1586
1587 let actual =
1588 typed_batch::IndexedZSetReader::iter(trace5.as_ref()).collect::<BTreeSet<_>>();
1589
1590 let missing = expected.difference(&actual).collect::<Vec<_>>();
1591 assert!(
1592 missing.is_empty(),
1593 "missing tuples in trace5: {:?}",
1594 missing
1595 );
1596 });
1597
1598 trace.apply3(&trace6, &watermark, move |trace, trace6, ts| {
1599 let mut all_tuples = BTreeMap::new();
1600 typed_batch::IndexedZSetReader::iter(trace.as_ref()).for_each(|(k, v, w)| {
1601 all_tuples
1602 .entry(k)
1603 .or_insert_with(|| Vec::new())
1604 .push((v, w))
1605 });
1606
1607 let expected =
1608 test_retain_values_top_n(trace.as_ref(), ts.as_ref(), val_lateness, 3);
1609
1610 let actual =
1611 typed_batch::IndexedZSetReader::iter(trace6.as_ref()).collect::<BTreeSet<_>>();
1612
1613 let missing = expected.difference(&actual).collect::<Vec<_>>();
1614 assert!(
1615 missing.is_empty(),
1616 "missing tuples in trace6: {:?}",
1617 missing
1618 );
1619 });
1620
1621 Ok(handle)
1622 })
1623 .unwrap();
1624
1625 for batch in batches {
1626 let mut tuples = batch
1627 .into_iter()
1628 .map(|((k, v), r)| Tup2(k, Tup2(v, r)))
1629 .collect::<Vec<_>>();
1630 input_handle.append(&mut tuples);
1631 dbsp.transaction().unwrap();
1632 }
1633 }
1634
1635 #[test]
1639 fn test_integrate_trace_retain_with_deletions() {
1640 let mut batches: Vec<Vec<((i32, i32), ZWeight)>> = vec![];
1641 for i in 0..1000 {
1642 batches.push(vec![
1643 ((i, i), 1), ((i, i + 1), 1),
1645 ((i, i + 2), 1),
1646 ((i, i + 3), 1),
1647 ((i, i + 4), 1),
1648 ]);
1649 batches.push(vec![
1650 ((i, i + 1), -1),
1651 ((i, i + 2), -1),
1652 ((i, i + 3), -1),
1653 ((i, i + 4), -1),
1654 ]);
1655 }
1656
1657 test_integrate_trace_retain(batches, 10, 10);
1658 }
1659
1660 fn spine_batches_count_values(profile: &crate::profile::DbspProfile) -> Vec<(String, usize)> {
1661 let root = GlobalNodeId::root();
1662 let operator_names = spine_operator_names(profile);
1663
1664 let mut counts = profile
1666 .worker_profiles
1667 .iter()
1668 .flat_map(|profile| profile.attribute_profile(&SPINE_BATCHES_COUNT))
1669 .filter_map(|(node_id, value)| {
1670 (node_id != root).then(|| match value {
1671 MetaItem::Count(count) => {
1672 let operator_name = operator_names
1673 .get(&node_id.node_identifier().to_string())
1674 .cloned()
1675 .unwrap_or_else(|| node_id.to_string());
1676 (operator_name, count)
1677 }
1678 value => panic!("spine_batches_count must be serialized as a count: {value:?}"),
1679 })
1680 })
1681 .collect::<Vec<_>>();
1682
1683 counts.sort();
1684 counts
1685 }
1686
1687 fn spine_operator_names(profile: &crate::profile::DbspProfile) -> BTreeMap<String, String> {
1688 let mut operator_names = BTreeMap::new();
1689
1690 if let Some(graph) = &profile.graph {
1691 let graph_json: serde_json::Value = serde_json::from_str(&graph.to_json()).unwrap();
1692 collect_operator_names(&graph_json, &mut operator_names);
1693 }
1694
1695 operator_names
1696 }
1697
1698 fn collect_operator_names(
1699 value: &serde_json::Value,
1700 operator_names: &mut BTreeMap<String, String>,
1701 ) {
1702 match value {
1703 serde_json::Value::Object(object) => {
1704 if let (Some(id), Some(label)) = (
1705 object.get("id").and_then(serde_json::Value::as_str),
1706 object.get("label").and_then(serde_json::Value::as_str),
1707 ) {
1708 let name = label
1709 .split("\\l")
1710 .next()
1711 .unwrap_or(label)
1712 .split(" @ ")
1713 .next()
1714 .unwrap_or(label);
1715 operator_names.insert(id.to_owned(), name.to_owned());
1716 }
1717
1718 for value in object.values() {
1719 collect_operator_names(value, operator_names);
1720 }
1721 }
1722 serde_json::Value::Array(array) => {
1723 for value in array {
1724 collect_operator_names(value, operator_names);
1725 }
1726 }
1727 _ => {}
1728 }
1729 }
1730
1731 #[test]
1732 fn test_start_compaction_accumulate_integrate_trace() {
1733 const NUM_TRACES: usize = 3;
1734 const ITERATIONS: usize = 3;
1735 const BATCHES_PER_ITERATION: usize = 20;
1736 const TOTAL_BATCHES: usize = BATCHES_PER_ITERATION * ITERATIONS * (ITERATIONS + 1) / 2;
1737 const RECORDS_PER_BATCH: usize = 2000;
1738
1739 let storage_dir = tempfile::tempdir().unwrap();
1740 let circuit_config =
1741 CircuitConfig::with_workers(2).with_temporary_storage(storage_dir.path());
1742 let (mut dbsp, (input_handles, trace_outputs)) =
1743 Runtime::init_circuit(circuit_config, move |circuit| {
1744 let mut input_handles = Vec::new();
1747 let mut trace_outputs = Vec::new();
1748
1749 for _ in 0..NUM_TRACES {
1750 let (stream, handle) = circuit.add_input_indexed_zset::<i32, i32>();
1751 let trace = stream.shard().accumulate_integrate_trace();
1752 let output = trace
1753 .apply(|trace| trace.ro_snapshot().consolidate())
1754 .output();
1755 input_handles.push(handle);
1756 trace_outputs.push(output);
1757 }
1758
1759 Ok((input_handles, trace_outputs))
1760 })
1761 .unwrap();
1762
1763 let mut next_batch = 0;
1764 for iteration in 0..ITERATIONS {
1765 let batches_this_iteration = (iteration + 1) * BATCHES_PER_ITERATION;
1768 let end_batch = next_batch + batches_this_iteration;
1769
1770 for batch in next_batch..end_batch {
1771 for (trace, input_handle) in input_handles.iter().enumerate() {
1772 let mut tuples = Vec::with_capacity(RECORDS_PER_BATCH);
1773 for record in 0..RECORDS_PER_BATCH {
1774 let key = (trace * TOTAL_BATCHES * RECORDS_PER_BATCH
1777 + batch * RECORDS_PER_BATCH
1778 + record) as i32;
1779 tuples.push(Tup2(key, Tup2(record as i32, 1)));
1780 }
1781
1782 input_handle.append(&mut tuples);
1783 }
1784
1785 dbsp.transaction().unwrap();
1786 }
1787
1788 next_batch = end_batch;
1789
1790 let profile = dbsp.retrieve_profile().unwrap();
1791 let counts = spine_batches_count_values(&profile);
1792 println!(
1793 "SPINE_BATCHES_COUNT values before compaction iteration {iteration}: {counts:?}"
1794 );
1795 assert!(
1796 !counts.is_empty(),
1797 "profile should contain at least one SPINE_BATCHES_COUNT metric"
1798 );
1799
1800 dbsp.start_compaction().unwrap();
1801 thread::sleep(Duration::from_millis(100));
1802 dbsp.start_compaction().unwrap();
1805
1806 let deadline = Instant::now() + Duration::from_secs(60);
1807 loop {
1808 let profile = dbsp.retrieve_profile().unwrap();
1809 let counts = spine_batches_count_values(&profile);
1810 println!(
1811 "SPINE_BATCHES_COUNT values after compaction request iteration {iteration}: {counts:?}"
1812 );
1813 assert!(
1814 !counts.is_empty(),
1815 "profile should contain at least one SPINE_BATCHES_COUNT metric"
1816 );
1817
1818 if counts.iter().all(|(_, count)| *count <= 1) {
1819 break;
1820 }
1821
1822 assert!(
1823 Instant::now() < deadline,
1824 "timed out waiting for all SPINE_BATCHES_COUNT values to be <= 1: {counts:?}"
1825 );
1826 thread::sleep(Duration::from_secs(5));
1827 }
1828
1829 dbsp.transaction().unwrap();
1832 for (trace, output) in trace_outputs.iter().enumerate() {
1833 let output = output.consolidate();
1834 let mut actual_tuples = output.iter();
1835
1836 for batch in 0..next_batch {
1837 for record in 0..RECORDS_PER_BATCH {
1838 let key = (trace * TOTAL_BATCHES * RECORDS_PER_BATCH
1839 + batch * RECORDS_PER_BATCH
1840 + record) as i32;
1841 assert_eq!(actual_tuples.next(), Some((key, record as i32, 1)));
1842 }
1843 }
1844
1845 assert_eq!(actual_tuples.next(), None);
1846 }
1847 }
1848 }
1849
1850 proptest! {
1851 #![proptest_config(ProptestConfig::with_cases(16))]
1852
1853 #[test]
1854 fn test_integrate_trace_retain_proptest(batches in quasi_monotone_batches(100, 20, 1000, 200, 100, 200)) {
1855 test_integrate_trace_retain(batches, 100, 1000);
1856 }
1857 }
1858}