1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
use crate::{
CmpFunc, DBData, OrdIndexedZSet, RootCircuit, Stream,
dynamic::{DowncastTrait, DynData},
operator::{dynamic::group::RowNumberCustomOrdFactories, group::WithCustomOrd},
};
impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
where
K: DBData,
V: DBData,
{
/// Row number operator.
///
/// This operator implements the behavior of the following SQL pattern:
/// ```text
/// SELECT
/// ...,
/// ROW_NUMBER() OVER (PARTITION BY .. ORDER BY ...) AS rank
/// FROM table
/// ```
///
/// The `CF` type establishes the ordering of values in the group.
///
/// The `output_func` closure takes a value and its row number and produces an
/// output value.
///
/// ## Correctness
///
/// * `CF` must establish a total order over `V`, consistent with `impl Eq
/// for V`, i.e., `CF::cmp(v1, v2) == Equal <=> v1.eq(v2)`.
/// * `CF` must be consistent with `rank_cmp_func`, i.e.,
/// - `CF::cmp(v1, v2) == Less => rank_cmp_func(v1, v2) != Greater`.
/// - `CF::cmp(v1, v2) == Equal => rank_cmp_func(v1, v2) == Equal`.
/// - `CF::cmp(v1, v2) == Greater => rank_cmp_func(v1, v2) != Less`.
pub fn row_number_custom_order<CF, OF, OV>(
&self,
output_func: OF,
) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where
CF: CmpFunc<V>,
OV: DBData,
OF: Fn(i64, &V) -> OV + 'static,
{
self.row_number_custom_order_persistent::<CF, OF, OV>(None, output_func)
}
pub fn row_number_custom_order_persistent<CF, OF, OV>(
&self,
persistent_id: Option<&str>,
output_func: OF,
) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where
CF: CmpFunc<V>,
OV: DBData,
OF: Fn(i64, &V) -> OV + 'static,
{
let factories = RowNumberCustomOrdFactories::<DynData, DynData, DynData>::new::<
K,
WithCustomOrd<V, CF>,
OV,
>();
self.inner()
.dyn_row_number_custom_order_mono(
persistent_id,
&factories,
Box::new(
move |v1, v2: &mut DynData /* <WithCustomOrd<V, CF>> */| unsafe {
*v2.downcast_mut::<WithCustomOrd<V, CF>>() =
WithCustomOrd::new(v1.downcast::<V>().clone())
},
),
Box::new(move |rank, v2, ov| unsafe {
*ov.downcast_mut() =
output_func(rank, &v2.downcast::<WithCustomOrd<V, CF>>().val)
}),
)
.typed()
}
}