Skip to main content

dbsp/operator/
star_join_macros.rs

1#[macro_export]
2macro_rules! generate_all_join_orderings {
3    ($($f:tt)::+, [$($fixed:tt)*], $first:expr, $($rest:expr),+ $(,)?) => {
4        $crate::generate_all_join_orderings!(@collect $($f)::+, [$($fixed)*], $first, []; []; $($rest),+)
5    };
6    (@collect $($f:tt)::+, [$($fixed:tt)*], $first:expr,
7        [$($out:expr,)*]; [$($prefix:expr,)*];
8        $head:expr $(, $tail:expr)*
9    ) => {
10        $crate::generate_all_join_orderings!(
11            @collect
12            $($f)::+,
13            [$($fixed)*],
14            $first,
15            [$($out,)* $($f)::+!($($fixed)* $($prefix,)* $first, $head $(, $tail)*),];
16            [$($prefix,)* $head,];
17            $($tail),*
18        )
19    };
20    (@collect $($f:tt)::+, [$($fixed:tt)*], $first:expr,
21        [$($out:expr,)*]; [$($prefix:expr,)*];
22    ) => {
23        [$($out,)* $($f)::+!($($fixed)* $($prefix,)* $first)]
24    };
25}
26
27#[macro_export]
28macro_rules! count_tts {
29    () => {
30        0usize
31    };
32    ($head:tt $(, $tail:tt)*) => {
33        1usize + $crate::count_tts!($($tail),*)
34    };
35}
36
37#[macro_export]
38macro_rules! build_star_join_index_func {
39    ($join_func:ident, $prefix_cursor:ident, $trace_cursors:ident, $($vals:expr),+ $(,)?) => {{
40        use $crate::{dynamic::{Erase, DowncastTrait}, trace::Cursor};
41
42        let mut ok: Box<DynData> = Box::<OK>::default().erase_box();
43        let mut ov: Box<DynData> = Box::<OV>::default().erase_box();
44        let join_func = $join_func.clone();
45
46        Box::new(move |$prefix_cursor, $trace_cursors, cb| {
47            for (k, v) in join_func(
48                unsafe { $prefix_cursor.key().downcast() },
49                $($vals),+
50            ) {
51                *unsafe { ok.downcast_mut() } = k;
52                *unsafe { ov.downcast_mut() } = v;
53                cb(ok.as_mut(), ov.as_mut());
54            }
55        })
56    }};
57}
58
59#[macro_export]
60macro_rules! build_star_join_flatmap_func {
61    ($join_func:ident, $prefix_cursor:ident, $trace_cursors:ident, $($vals:expr),+ $(,)?) => {{
62        use $crate::dynamic::{Erase, DowncastTrait};
63        use $crate::trace::Cursor;
64
65        let mut ov: Box<DynData> = Box::<OV>::default().erase_box();
66        let join_func = $join_func.clone();
67
68        Box::new(move |$prefix_cursor, $trace_cursors, cb| {
69            for v in join_func(
70                unsafe { $prefix_cursor.key().downcast() },
71                $($vals),+
72            ) {
73                *unsafe { ov.downcast_mut() } = v;
74                cb(ov.as_mut(), ().erase_mut());
75            }
76        })
77    }};
78}
79
80#[macro_export]
81macro_rules! build_star_join_func {
82    ($join_func:ident, $prefix_cursor:ident, $trace_cursors:ident, $($vals:expr),+ $(,)?) => {{
83        use $crate::dynamic::{Erase, DowncastTrait};
84        use $crate::trace::Cursor;
85
86        let mut ov: Box<DynData> = Box::<OV>::default().erase_box();
87        let join_func = $join_func.clone();
88
89        Box::new(move |$prefix_cursor, $trace_cursors, cb| {
90            let v = join_func(
91                unsafe { $prefix_cursor.key().downcast() },
92                $($vals),+
93            );
94            *unsafe { ov.downcast_mut() } = v;
95            cb(ov.as_mut(), ().erase_mut());
96        })
97    }};
98}
99
100#[macro_export]
101macro_rules! star_join_index_funcs {
102    (
103        $join_func:ident,
104        $prefix_cursor:ident,
105        $trace_cursors:ident,
106        [$($trace_idx:tt),+ $(,)?]
107    ) => {
108        $crate::generate_all_join_orderings!(
109            $crate::build_star_join_index_func,
110            [$join_func, $prefix_cursor, $trace_cursors,],
111            unsafe { $prefix_cursor.val().downcast() },
112            $(unsafe { $trace_cursors[$trace_idx].val().downcast() }),+
113        )
114    };
115}
116
117#[macro_export]
118macro_rules! star_join_flatmap_funcs {
119    (
120        $join_func:ident,
121        $prefix_cursor:ident,
122        $trace_cursors:ident,
123        [$($trace_idx:tt),+ $(,)?]
124    ) => {
125        $crate::generate_all_join_orderings!(
126            $crate::build_star_join_flatmap_func,
127            [$join_func, $prefix_cursor, $trace_cursors,],
128            unsafe { $prefix_cursor.val().downcast() },
129            $(unsafe { $trace_cursors[$trace_idx].val().downcast() }),+
130        )
131    };
132}
133
134#[macro_export]
135macro_rules! star_join_funcs {
136    (
137        $join_func:ident,
138        $prefix_cursor:ident,
139        $trace_cursors:ident,
140        [$($trace_idx:tt),+ $(,)?]
141    ) => {
142        $crate::generate_all_join_orderings!(
143            $crate::build_star_join_func,
144            [$join_func, $prefix_cursor, $trace_cursors,],
145            unsafe { $prefix_cursor.val().downcast() },
146            $(unsafe { $trace_cursors[$trace_idx].val().downcast() }),+
147        )
148    };
149}
150
151#[macro_export]
152macro_rules! inner_star_join_index_body {
153    (
154        $stream1:expr,
155        [$($stream:expr),+ $(,)?],
156        $join_func:ident,
157        [$($val_ty:ty),+ $(,)?],
158        [$($trace_idx:tt),+ $(,)?],
159        $k_ty:ty,
160        $ok_ty:ty,
161        $ov_ty:ty
162    ) => {{
163        use $crate::{
164            NestedCircuit,
165            dynamic::DynData,
166            operator::dynamic::{MonoIndexedZSet, multijoin::{StarJoinFunc, StarJoinFactories}},
167            trace::BatchReaderFactories,
168        };
169
170        let join_funcs: [StarJoinFunc<NestedCircuit, MonoIndexedZSet, DynData, DynData>;
171            $crate::count_tts!($($trace_idx),+) + 1
172        ] = $crate::star_join_index_funcs!(
173            $join_func,
174            prefix_cursor,
175            trace_cursors,
176            [$($trace_idx),+]
177        );
178
179        let mut join_factories = StarJoinFactories::new::<$ok_ty, $ov_ty>();
180        $(
181            join_factories.add_input_factories(
182                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
183                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
184            );
185        )+
186
187        $stream1
188            .inner()
189            .dyn_inner_star_join_index_mono(
190                &join_factories,
191                &[$($stream.inner()),+],
192                &join_funcs,
193            )
194            .typed()
195    }};
196}
197
198#[macro_export]
199macro_rules! inner_star_join_index_root_body {
200    (
201        $stream1:expr,
202        [$($stream:expr),+ $(,)?],
203        $join_func:ident,
204        [$($val_ty:ty),+ $(,)?],
205        [$($trace_idx:tt),+ $(,)?],
206        $k_ty:ty,
207        $ok_ty:ty,
208        $ov_ty:ty
209    ) => {{
210        use $crate::{
211            RootCircuit,
212            dynamic::DynData,
213            operator::dynamic::{MonoIndexedZSet, multijoin::{StarJoinFunc, StarJoinFactories}},
214            trace::BatchReaderFactories,
215        };
216
217        let join_funcs: [StarJoinFunc<RootCircuit, MonoIndexedZSet, DynData, DynData>;
218            $crate::count_tts!($($trace_idx),+) + 1
219        ] = $crate::star_join_index_funcs!(
220            $join_func,
221            prefix_cursor,
222            trace_cursors,
223            [$($trace_idx),+]
224        );
225
226        let mut join_factories = StarJoinFactories::new::<$ok_ty, $ov_ty>();
227        $(
228            join_factories.add_input_factories(
229                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
230                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
231            );
232        )+
233
234        $stream1
235            .inner()
236            .dyn_inner_star_join_index_mono(
237                &join_factories,
238                &[$($stream.inner()),+],
239                &join_funcs,
240            )
241            .typed()
242    }};
243}
244
245#[macro_export]
246macro_rules! star_join_index_body {
247    (
248        $stream1:expr,
249        [$(($stream:expr, $saturate:expr)),+ $(,)?],
250        $join_func:ident,
251        [$($val_ty:ty),+ $(,)?],
252        [$($trace_idx:tt),+ $(,)?],
253        $k_ty:ty,
254        $ok_ty:ty,
255        $ov_ty:ty
256    ) => {{
257        use $crate::{
258            RootCircuit,
259            dynamic::DynData,
260            operator::dynamic::{MonoIndexedZSet, multijoin::{StarJoinFunc, StarJoinFactories}},
261            trace::BatchReaderFactories,
262        };
263
264        let join_funcs: [StarJoinFunc<RootCircuit, MonoIndexedZSet, DynData, DynData>;
265            $crate::count_tts!($($trace_idx),+) + 1
266        ] = $crate::star_join_index_funcs!(
267            $join_func,
268            prefix_cursor,
269            trace_cursors,
270            [$($trace_idx),+]
271        );
272
273        let mut join_factories = StarJoinFactories::new::<$ok_ty, $ov_ty>();
274        $(
275            join_factories.add_input_factories(
276                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
277                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
278            );
279        )+
280
281        $stream1
282            .inner()
283            .dyn_star_join_index_mono(
284                &join_factories,
285                &[$(($stream.inner(), $saturate)),+],
286                &join_funcs,
287            )
288            .typed()
289    }};
290}
291
292#[macro_export]
293macro_rules! inner_star_join_flatmap_body {
294    (
295        $stream1:expr,
296        [$($stream:expr),+ $(,)?],
297        $join_func:ident,
298        [$($val_ty:ty),+ $(,)?],
299        [$($trace_idx:tt),+ $(,)?],
300        $k_ty:ty,
301        $ov_ty:ty
302    ) => {{
303        use $crate::{
304            NestedCircuit,
305            dynamic::{DynData, DynUnit},
306            operator::dynamic::{MonoIndexedZSet, multijoin::{StarJoinFunc, StarJoinFactories}},
307            trace::BatchReaderFactories,
308        };
309
310        let join_funcs: [StarJoinFunc<NestedCircuit, MonoIndexedZSet, DynData, DynUnit>;
311            $crate::count_tts!($($trace_idx),+) + 1
312        ] = $crate::star_join_flatmap_funcs!(
313            $join_func,
314            prefix_cursor,
315            trace_cursors,
316            [$($trace_idx),+]
317        );
318
319        let mut join_factories = StarJoinFactories::new::<$ov_ty, ()>();
320        $(
321            join_factories.add_input_factories(
322                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
323                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
324            );
325        )+
326
327        $stream1
328            .inner()
329            .dyn_inner_star_join_mono(
330                &join_factories,
331                &[$($stream.inner()),+],
332                &join_funcs,
333            )
334            .typed()
335    }};
336}
337
338#[macro_export]
339macro_rules! inner_star_join_flatmap_root_body {
340    (
341        $stream1:expr,
342        [$($stream:expr),+ $(,)?],
343        $join_func:ident,
344        [$($val_ty:ty),+ $(,)?],
345        [$($trace_idx:tt),+ $(,)?],
346        $k_ty:ty,
347        $ov_ty:ty
348    ) => {{
349        use $crate::{
350            RootCircuit,
351            dynamic::{DynData, DynUnit},
352            operator::dynamic::{MonoIndexedZSet, multijoin::{StarJoinFunc, StarJoinFactories}},
353            trace::BatchReaderFactories,
354        };
355
356        let join_funcs: [StarJoinFunc<RootCircuit, MonoIndexedZSet, DynData, DynUnit>;
357            $crate::count_tts!($($trace_idx),+) + 1
358        ] = $crate::star_join_flatmap_funcs!(
359            $join_func,
360            prefix_cursor,
361            trace_cursors,
362            [$($trace_idx),+]
363        );
364
365        let mut join_factories = StarJoinFactories::new::<$ov_ty, ()>();
366        $(
367            join_factories.add_input_factories(
368                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
369                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
370            );
371        )+
372
373        $stream1
374            .inner()
375            .dyn_inner_star_join_mono(
376                &join_factories,
377                &[$($stream.inner()),+],
378                &join_funcs,
379            )
380            .typed()
381    }};
382}
383
384#[macro_export]
385macro_rules! star_join_flatmap_body {
386    (
387        $stream1:expr,
388        [$(($stream:expr, $saturate:expr)),+ $(,)?],
389        $join_func:ident,
390        [$($val_ty:ty),+ $(,)?],
391        [$($trace_idx:tt),+ $(,)?],
392        $k_ty:ty,
393        $ov_ty:ty
394    ) => {{
395        use $crate::{
396            RootCircuit,
397            dynamic::{DynData, DynUnit},
398            operator::dynamic::{MonoIndexedZSet, multijoin::{StarJoinFunc, StarJoinFactories}},
399            trace::BatchReaderFactories,
400        };
401
402        let join_funcs: [StarJoinFunc<RootCircuit, MonoIndexedZSet, DynData, DynUnit>;
403            $crate::count_tts!($($trace_idx),+) + 1
404        ] = $crate::star_join_flatmap_funcs!(
405            $join_func,
406            prefix_cursor,
407            trace_cursors,
408            [$($trace_idx),+]
409        );
410
411        let mut join_factories = StarJoinFactories::new::<$ov_ty, ()>();
412        $(
413            join_factories.add_input_factories(
414                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
415                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
416            );
417        )+
418
419        $stream1
420            .inner()
421            .dyn_star_join_mono(
422                &join_factories,
423                &[$(($stream.inner(), $saturate)),+],
424                &join_funcs,
425            )
426            .typed()
427    }};
428}
429
430#[macro_export]
431macro_rules! inner_star_join_root_body {
432    (
433        $stream1:expr,
434        [$($stream:expr),+ $(,)?],
435        $join_func:ident,
436        [$($val_ty:ty),+ $(,)?],
437        [$($trace_idx:tt),+ $(,)?],
438        $k_ty:ty,
439        $ov_ty:ty
440    ) => {{
441        use $crate::{
442            RootCircuit,
443            dynamic::{DynData, DynUnit},
444            operator::dynamic::{MonoIndexedZSet, multijoin::{StarJoinFunc, StarJoinFactories}},
445            trace::BatchReaderFactories,
446        };
447
448        let join_funcs: [StarJoinFunc<RootCircuit, MonoIndexedZSet, DynData, DynUnit>;
449            $crate::count_tts!($($trace_idx),+) + 1
450        ] = $crate::star_join_funcs!(
451            $join_func,
452            prefix_cursor,
453            trace_cursors,
454            [$($trace_idx),+]
455        );
456
457        let mut join_factories = StarJoinFactories::new::<$ov_ty, ()>();
458        $(
459            join_factories.add_input_factories(
460                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
461                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
462            );
463        )+
464
465        $stream1
466            .inner()
467            .dyn_inner_star_join_mono(
468                &join_factories,
469                &[$($stream.inner()),+],
470                &join_funcs,
471            )
472            .typed()
473    }};
474}
475
476#[macro_export]
477macro_rules! inner_star_join_body {
478    (
479        $stream1:expr,
480        [$($stream:expr),+ $(,)?],
481        $join_func:ident,
482        [$($val_ty:ty),+ $(,)?],
483        [$($trace_idx:tt),+ $(,)?],
484        $k_ty:ty,
485        $ov_ty:ty
486    ) => {{
487        use $crate::{
488            NestedCircuit,
489            dynamic::{DynData, DynUnit},
490            operator::dynamic::{MonoIndexedZSet, multijoin::{StarJoinFunc, StarJoinFactories}},
491            trace::BatchReaderFactories,
492        };
493
494
495        let join_funcs: [StarJoinFunc<NestedCircuit, MonoIndexedZSet, DynData, DynUnit>;
496            $crate::count_tts!($($trace_idx),+) + 1
497        ] = $crate::star_join_funcs!(
498            $join_func,
499            prefix_cursor,
500            trace_cursors,
501            [$($trace_idx),+]
502        );
503
504        let mut join_factories = StarJoinFactories::new::<$ov_ty, ()>();
505        $(
506            join_factories.add_input_factories(
507                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
508                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
509            );
510        )+
511
512        $stream1
513            .inner()
514            .dyn_inner_star_join_mono(
515                &join_factories,
516                &[$($stream.inner()),+],
517                &join_funcs,
518            )
519            .typed()
520    }};
521}
522
523#[macro_export]
524macro_rules! star_join_body {
525    (
526        $stream1:expr,
527        [$(($stream:expr, $saturate:expr)),+ $(,)?],
528        $join_func:ident,
529        [$($val_ty:ty),+ $(,)?],
530        [$($trace_idx:tt),+ $(,)?],
531        $k_ty:ty,
532        $ov_ty:ty
533    ) => {{
534        use $crate::{
535            RootCircuit,
536            dynamic::{DynData, DynUnit},
537            operator::dynamic::{MonoIndexedZSet, multijoin::{StarJoinFunc, StarJoinFactories}},
538            trace::BatchReaderFactories,
539        };
540
541        let join_funcs: [StarJoinFunc<RootCircuit, MonoIndexedZSet, DynData, DynUnit>;
542            $crate::count_tts!($($trace_idx),+) + 1
543        ] = $crate::star_join_funcs!(
544            $join_func,
545            prefix_cursor,
546            trace_cursors,
547            [$($trace_idx),+]
548        );
549
550        let mut join_factories = StarJoinFactories::new::<$ov_ty, ()>();
551        $(
552            join_factories.add_input_factories(
553                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
554                BatchReaderFactories::new::<$k_ty, $val_ty, ZWeight>(),
555            );
556        )+
557
558        $stream1
559            .inner()
560            .dyn_star_join_mono(
561                &join_factories,
562                &[$(($stream.inner(), $saturate)),+],
563                &join_funcs,
564            )
565            .typed()
566    }};
567}
568
569/// Generate a `inner_star_joinN` function.
570///
571/// Call this macro for every number N in order to generate an N-way inner star join
572/// operator for both RootCircuit and NestedCircuit.
573///
574/// The operator computes an incremental join of multiple streams on the same key using a
575/// user-provided join function that returns exactly one output value for each input tuple.
576///
577/// Example generated function signature:
578///
579/// ```text
580/// impl<K, V1> Stream<RootCircuit, OrdIndexedZSet<K, V1>>
581/// where
582///     K: DBData,
583///     V1: DBData,
584/// {
585///     pub fn inner_star_join4<V2, V3, V4, OV>(
586///         &self,
587///         stream2: &Stream<crate::RootCircuit, OrdIndexedZSet<K, V2>>,
588///         stream3: &Stream<crate::RootCircuit, OrdIndexedZSet<K, V3>>,
589///         stream4: &Stream<crate::RootCircuit, OrdIndexedZSet<K, V4>>,
590///         join_func: impl Fn(&K, &V1, &V2, &V3, &V4) -> OV + Clone + 'static,
591///     ) -> Stream<RootCircuit, OrdZSet<OV>>
592///     where
593///         V2: DBData,
594///         V3: DBData,
595///         V4: DBData,
596///         OV: DBData;
597/// }
598/// ```
599#[macro_export]
600macro_rules! define_inner_star_join {
601    ($n:literal) => {
602        seq_macro::seq!(I in 2..=$n {
603            paste::paste! {
604                impl<K, V1> $crate::Stream<$crate::RootCircuit, $crate::OrdIndexedZSet<K, V1>>
605                where
606                    K: $crate::DBData,
607                    V1: $crate::DBData,
608                {
609                    pub fn [<inner_star_join $n>]<#(V~I,)* OV>(
610                        &self,
611                        #(
612                            stream~I: &$crate::Stream<$crate::RootCircuit, $crate::OrdIndexedZSet<K, V~I>>,
613                        )*
614                        join_func: impl Fn(&K, &V1, #(&V~I,)*) -> OV + Clone + 'static,
615                    ) -> $crate::Stream<$crate::RootCircuit, $crate::OrdZSet<OV>>
616                    where
617                        #(V~I: $crate::DBData,)*
618                        OV: $crate::DBData,
619                    {
620                        $crate::inner_star_join_root_body!(
621                            self,
622                            [#(stream~I,)*],
623                            join_func,
624                            [V1, #(V~I,)*],
625                            [#((I - 2),)*],
626                            K,
627                            OV
628                        )
629                    }
630                }
631
632                impl<K, V1> $crate::Stream<$crate::NestedCircuit, $crate::OrdIndexedZSet<K, V1>>
633                where
634                    K: $crate::DBData,
635                    V1: $crate::DBData,
636                {
637                    pub fn [<inner_star_join $n>]<#(V~I,)* OV>(
638                        &self,
639                        #(
640                            stream~I: &$crate::Stream<$crate::NestedCircuit, $crate::OrdIndexedZSet<K, V~I>>,
641                        )*
642                        join_func: impl Fn(&K, &V1, #(&V~I,)*) -> OV + Clone + 'static,
643                    ) -> $crate::Stream<$crate::NestedCircuit, $crate::OrdZSet<OV>>
644                    where
645                        #(V~I: $crate::DBData,)*
646                        OV: $crate::DBData,
647                    {
648                        $crate::inner_star_join_body!(
649                            self,
650                            [#(stream~I,)*],
651                            join_func,
652                            [V1, #(V~I,)*],
653                            [#((I - 2),)*],
654                            K,
655                            OV
656                        )
657                    }
658                }
659            }
660        });
661    };
662}
663
664/// Generate a `inner_star_join_indexN` function.
665///
666/// Call this macro for every number N in order to generate an N-way inner star join
667/// index operator for both RootCircuit and NestedCircuit.
668///
669/// The operator computes an incremental join of multiple streams on the same key using a
670/// user-provided join function that can return 0 or more output key-value pairs for each input tuple.
671///
672/// Example generated function signature:
673///
674/// ```text
675/// impl<K, V1> Stream<RootCircuit, OrdIndexedZSet<K, V1>>
676/// where
677///     K: DBData,
678///     V1: DBData,
679/// {
680///     pub fn inner_star_join_index4<V2, V3, V4, OK, OV, It>(
681///         &self,
682///         stream2: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>,
683///         stream3: &Stream<RootCircuit, OrdIndexedZSet<K, V3>>,
684///         stream4: &Stream<RootCircuit, OrdIndexedZSet<K, V4>>,
685///         join_func: impl Fn(&K, &V1, &V2, &V3, &V4) -> It + Clone + 'static,
686///     ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
687///     where
688///         V2: DBData,
689///         V3: DBData,
690///         V4: DBData,
691///         OK: DBData,
692///         OV: DBData,
693///         It: IntoIterator<Item = (OK, OV)> + 'static;
694/// }
695/// ```
696#[macro_export]
697macro_rules! define_inner_star_join_index {
698    ($n:literal) => {
699        seq_macro::seq!(I in 2..=$n {
700            paste::paste! {
701                impl<K, V1> $crate::Stream<$crate::RootCircuit, $crate::OrdIndexedZSet<K, V1>>
702                where
703                    K: $crate::DBData,
704                    V1: $crate::DBData,
705                {
706                    pub fn [<inner_star_join_index $n>]<#(V~I,)* OK, OV, It>(
707                        &self,
708                        #(
709                            stream~I: &$crate::Stream<$crate::RootCircuit, $crate::OrdIndexedZSet<K, V~I>>,
710                        )*
711                        join_func: impl Fn(&K, &V1, #(&V~I,)*) -> It + Clone + 'static,
712                    ) -> $crate::Stream<$crate::RootCircuit, $crate::OrdIndexedZSet<OK, OV>>
713                    where
714                        #(V~I: $crate::DBData,)*
715                        OK: $crate::DBData,
716                        OV: $crate::DBData,
717                        It: IntoIterator<Item = (OK, OV)> + 'static,
718                    {
719                        $crate::inner_star_join_index_root_body!(
720                            self,
721                            [#(stream~I,)*],
722                            join_func,
723                            [V1, #(V~I,)*],
724                            [#((I - 2),)*],
725                            K,
726                            OK,
727                            OV
728                        )
729                    }
730                }
731
732                impl<K, V1> $crate::Stream<$crate::NestedCircuit, $crate::OrdIndexedZSet<K, V1>>
733                where
734                    K: $crate::DBData,
735                    V1: $crate::DBData,
736                {
737                    pub fn [<inner_star_join_index $n>]<#(V~I,)* OK, OV, It>(
738                        &self,
739                        #(
740                            stream~I: &$crate::Stream<$crate::NestedCircuit, $crate::OrdIndexedZSet<K, V~I>>,
741                        )*
742                        join_func: impl Fn(&K, &V1, #(&V~I,)*) -> It + Clone + 'static,
743                    ) -> $crate::Stream<$crate::NestedCircuit, $crate::OrdIndexedZSet<OK, OV>>
744                    where
745                        #(V~I: $crate::DBData,)*
746                        OK: $crate::DBData,
747                        OV: $crate::DBData,
748                        It: IntoIterator<Item = (OK, OV)> + 'static,
749                    {
750                        $crate::inner_star_join_index_body!(
751                            self,
752                            [#(stream~I,)*],
753                            join_func,
754                            [V1, #(V~I,)*],
755                            [#((I - 2),)*],
756                            K,
757                            OK,
758                            OV
759                        )
760                    }
761                }
762            }
763        });
764    };
765}
766
767/// Generate a `inner_star_join_flatmapN` function.
768///
769/// Call this macro for every number N in order to generate an N-way inner star join
770/// flatmap operator for both RootCircuit and NestedCircuit.
771///
772/// The operator computes an incremental join of multiple streams on the same key using a
773/// user-provided join function that can return 0 or more output values for each input tuple.
774///
775/// Example generated function signature:
776///
777/// ```text
778/// impl<K, V1> Stream<RootCircuit, OrdIndexedZSet<K, V1>>
779/// where
780///     K: DBData,
781///     V1: DBData,
782/// {
783///     pub fn inner_star_join_flatmap4<V2, V3, V4, OV, It>(
784///         &self,
785///         stream2: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>,
786///         stream3: &Stream<RootCircuit, OrdIndexedZSet<K, V3>>,
787///         stream4: &Stream<RootCircuit, OrdIndexedZSet<K, V4>>,
788///         join_func: impl Fn(&K, &V1, &V2, &V3, &V4) -> It + Clone + 'static,
789///     ) -> Stream<RootCircuit, OrdZSet<OV>>
790///     where
791///         V2: DBData,
792///         V3: DBData,
793///         V4: DBData,
794///         OV: DBData,
795///         It: IntoIterator<Item = OV> + 'static;
796/// }
797/// ```
798#[macro_export]
799macro_rules! define_inner_star_join_flatmap {
800    ($n:literal) => {
801        seq_macro::seq!(I in 2..=$n {
802            paste::paste! {
803                impl<K, V1> $crate::Stream<$crate::RootCircuit, $crate::OrdIndexedZSet<K, V1>>
804                where
805                    K: $crate::DBData,
806                    V1: $crate::DBData,
807                {
808                    pub fn [<inner_star_join_flatmap $n>]<#(V~I,)* OV, It>(
809                        &self,
810                        #(
811                            stream~I: &$crate::Stream<$crate::RootCircuit, $crate::OrdIndexedZSet<K, V~I>>,
812                        )*
813                        join_func: impl Fn(&K, &V1, #(&V~I,)*) -> It + Clone + 'static,
814                    ) -> $crate::Stream<$crate::RootCircuit, $crate::OrdZSet<OV>>
815                    where
816                        #(V~I: $crate::DBData,)*
817                        OV: $crate::DBData,
818                        It: IntoIterator<Item = OV> + 'static,
819                    {
820                        $crate::inner_star_join_flatmap_root_body!(
821                            self,
822                            [#(stream~I,)*],
823                            join_func,
824                            [V1, #(V~I,)*],
825                            [#((I - 2),)*],
826                            K,
827                            OV
828                        )
829                    }
830                }
831
832                impl<K, V1> $crate::Stream<$crate::NestedCircuit, $crate::OrdIndexedZSet<K, V1>>
833                where
834                    K: $crate::DBData,
835                    V1: $crate::DBData,
836                {
837                    pub fn [<inner_star_join_flatmap $n>]<#(V~I,)* OV, It>(
838                        &self,
839                        #(
840                            stream~I: &$crate::Stream<$crate::NestedCircuit, $crate::OrdIndexedZSet<K, V~I>>,
841                        )*
842                        join_func: impl Fn(&K, &V1, #(&V~I,)*) -> It + Clone + 'static,
843                    ) -> $crate::Stream<$crate::NestedCircuit, $crate::OrdZSet<OV>>
844                    where
845                        #(V~I: $crate::DBData,)*
846                        OV: $crate::DBData,
847                        It: IntoIterator<Item = OV> + 'static,
848                    {
849                        $crate::inner_star_join_flatmap_body!(
850                            self,
851                            [#(stream~I,)*],
852                            join_func,
853                            [V1, #(V~I,)*],
854                            [#((I - 2),)*],
855                            K,
856                            OV
857                        )
858                    }
859                }
860            }
861        });
862    };
863}
864
865/// Generate a `star_joinN` function.
866///
867/// Call this macro for every number N in order to generate an N-way star-join operator.
868/// This operator is only available for RootCircuit. See `define_inner_star_join` for a
869/// star join operator that works for NestedCircuit.
870///
871/// The operator computes an incremental join of multiple streams on the same key using a
872/// user-provided join function that must return exactly one value per input tuple.  The
873/// `saturate` flag for each input stream (except the first one) controls whether to compute
874/// an outer or inner join for that stream.
875///
876/// Example generated function signature:
877///
878/// ```text
879/// impl<K, V1> Stream<RootCircuit, OrdIndexedZSet<K, V1>>
880/// where
881///     K: DBData,
882///     V1: DBData,
883/// {
884///     pub fn star_join4<V2, V3, V4, OV>(
885///         &self,
886///         (stream2, saturate2): (&Stream<RootCircuit, OrdIndexedZSet<K, V2>>, bool),
887///         (stream3, saturate3): (&Stream<RootCircuit, OrdIndexedZSet<K, V3>>, bool),
888///         (stream4, saturate4): (&Stream<RootCircuit, OrdIndexedZSet<K, V4>>, bool),
889///         join_func: impl Fn(&K, &V1, &V2, &V3, &V4) -> OV + Clone + 'static,
890///     ) -> Stream<RootCircuit, OrdZSet<OV>>
891///     where
892///         V2: DBData,
893///         V3: DBData,
894///         V4: DBData,
895///         OV: DBData;
896/// }
897/// ```
898#[macro_export]
899macro_rules! define_star_join {
900    ($n:literal) => {
901        seq_macro::seq!(I in 2..=$n {
902            paste::paste! {
903                impl<K, V1> $crate::Stream<$crate::RootCircuit, $crate::OrdIndexedZSet<K, V1>>
904                where
905                    K: $crate::DBData,
906                    V1: $crate::DBData,
907                {
908                    pub fn [<star_join $n>]<#(V~I,)* OV>(
909                        &self,
910                        #(
911                            (stream~I, saturate~I): (
912                                &$crate::Stream<$crate::RootCircuit, $crate::OrdIndexedZSet<K, V~I>>,
913                                bool
914                            ),
915                        )*
916                        join_func: impl Fn(&K, &V1, #(&V~I,)*) -> OV + Clone + 'static,
917                    ) -> $crate::Stream<$crate::RootCircuit, $crate::OrdZSet<OV>>
918                    where
919                        #(V~I: $crate::DBData,)*
920                        OV: $crate::DBData,
921                    {
922                        $crate::star_join_body!(
923                            self,
924                            [#((stream~I, saturate~I),)*],
925                            join_func,
926                            [V1, #(V~I,)*],
927                            [#((I - 2),)*],
928                            K,
929                            OV
930                        )
931                    }
932                }
933            }
934        });
935    };
936}
937
938/// Generate a `star_join_indexN` function.
939///
940/// Call this macro for every number N in order to generate an N-way star-join-index operator.
941/// This operator is only available for RootCircuit. See `define_inner_star_join_index` for a
942/// star-join-index operator that works for NestedCircuit.
943///
944/// The operator computes an incremental join of multiple streams on the same key using a
945/// user-provided join function that can return 0 or more output key-value pairs for each input tuple.
946/// The `saturate` flag for each input stream (except the first one) controls whether to compute an outer or inner join
947/// for that stream.
948///
949/// Example generated function signature:
950///
951/// ```text
952/// impl<K, V1> Stream<RootCircuit, OrdIndexedZSet<K, V1>>
953/// where
954///     K: DBData,
955///     V1: DBData,
956/// {
957///     pub fn star_join_index4<V2, V3, V4, OK, OV, It>(
958///         &self,
959///         (stream2, saturate2): (
960///             &Stream<RootCircuit, OrdIndexedZSet<K, V2>>,
961///             bool,
962///         ),
963///         (stream3, saturate3): (
964///             &Stream<RootCircuit, OrdIndexedZSet<K, V3>>,
965///             bool,
966///         ),
967///         (stream4, saturate4): (
968///             &Stream<RootCircuit, OrdIndexedZSet<K, V4>>,
969///             bool,
970///         ),
971///         join_func: impl Fn(&K, &V1, &V2, &V3, &V4) -> It + Clone + 'static,
972///     ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
973///     where
974///         V2: DBData,
975///         V3: DBData,
976///         V4: DBData,
977///         OK: DBData,
978///         OV: DBData,
979///         It: IntoIterator<Item = (OK, OV)> + 'static;
980/// }
981/// ```
982#[macro_export]
983macro_rules! define_star_join_index {
984    ($n:literal) => {
985        seq_macro::seq!(I in 2..=$n {
986            paste::paste! {
987                impl<K, V1> $crate::Stream<$crate::RootCircuit, $crate::OrdIndexedZSet<K, V1>>
988                where
989                    K: $crate::DBData,
990                    V1: $crate::DBData,
991                {
992                    pub fn [<star_join_index $n>]<#(V~I,)* OK, OV, It>(
993                        &self,
994                        #(
995                            (stream~I, saturate~I): (
996                                &$crate::Stream<$crate::RootCircuit, $crate::OrdIndexedZSet<K, V~I>>,
997                                bool
998                            ),
999                        )*
1000                        join_func: impl Fn(&K, &V1, #(&V~I,)*) -> It + Clone + 'static,
1001                    ) -> $crate::Stream<$crate::RootCircuit, $crate::OrdIndexedZSet<OK, OV>>
1002                    where
1003                        #(V~I: $crate::DBData,)*
1004                        OK: $crate::DBData,
1005                        OV: $crate::DBData,
1006                        It: IntoIterator<Item = (OK, OV)> + 'static,
1007                    {
1008                        $crate::star_join_index_body!(
1009                            self,
1010                            [#((stream~I, saturate~I),)*],
1011                            join_func,
1012                            [V1, #(V~I,)*],
1013                            [#((I - 2),)*],
1014                            K,
1015                            OK,
1016                            OV
1017                        )
1018                    }
1019                }
1020            }
1021        });
1022    };
1023}
1024
1025/// Generate a `star_join_flatmapN` function.
1026///
1027/// Call this macro for every number N in order to generate an N-way star join flatmap operator.
1028/// This operator is only available for RootCircuit. See `define_inner_star_join_flatmap` for a
1029/// star join flatmap operator that works for NestedCircuit.
1030///
1031/// The operator computes an incremental join of multiple streams on the same key using a
1032/// user-provided join function that can return 0 or more output values for each input tuple.
1033/// The `saturate` flag for each input stream (except the first one) controls whether to compute
1034/// an outer or inner join for that stream.
1035///
1036/// Example generated function signature:
1037///
1038/// ```text
1039/// impl<K, V1> Stream<RootCircuit, OrdIndexedZSet<K, V1>>
1040/// where
1041///     K: DBData,
1042///     V1: DBData,
1043/// {
1044///     pub fn star_join_flatmap4<V2, V3, V4, OV, It>(
1045///         &self,
1046///         (stream2, saturate2): (
1047///             &Stream<RootCircuit, OrdIndexedZSet<K, V2>>,
1048///             bool,
1049///         ),
1050///         (stream3, saturate3): (
1051///             &Stream<RootCircuit, OrdIndexedZSet<K, V3>>,
1052///             bool,
1053///         ),
1054///         (stream4, saturate4): (
1055///             &Stream<RootCircuit, OrdIndexedZSet<K, V4>>,
1056///             bool,
1057///         ),
1058///         join_func: impl Fn(&K, &V1, &V2, &V3, &V4) -> It + Clone + 'static,
1059///     ) -> Stream<RootCircuit, OrdZSet<OV>>
1060/// ```
1061#[macro_export]
1062macro_rules! define_star_join_flatmap {
1063    ($n:literal) => {
1064        seq_macro::seq!(I in 2..=$n {
1065            paste::paste! {
1066                impl<K, V1> $crate::Stream<$crate::RootCircuit, $crate::OrdIndexedZSet<K, V1>>
1067                where
1068                    K: $crate::DBData,
1069                    V1: $crate::DBData,
1070                {
1071                    pub fn [<star_join_flatmap $n>]<#(V~I,)* OV, It>(
1072                        &self,
1073                        #(
1074                            (stream~I, saturate~I): (
1075                                &$crate::Stream<$crate::RootCircuit, $crate::OrdIndexedZSet<K, V~I>>,
1076                                bool
1077                            ),
1078                        )*
1079                        join_func: impl Fn(&K, &V1, #(&V~I,)*) -> It + Clone + 'static,
1080                    ) -> $crate::Stream<$crate::RootCircuit, $crate::OrdZSet<OV>>
1081                    where
1082                        #(V~I: $crate::DBData,)*
1083                        OV: $crate::DBData,
1084                        It: IntoIterator<Item = OV> + 'static,
1085                    {
1086                        $crate::star_join_flatmap_body!(
1087                            self,
1088                            [#((stream~I, saturate~I),)*],
1089                            join_func,
1090                            [V1, #(V~I,)*],
1091                            [#((I - 2),)*],
1092                            K,
1093                            OV
1094                        )
1095                    }
1096                }
1097            }
1098        });
1099    };
1100}