1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use crate::{
DBData, OrdIndexedZSet, OrdZSet, RootCircuit, Stream,
dynamic::{DowncastTrait, DynData, DynUnit, Erase},
};
use super::dynamic::asof_join::AsofJoinFactories;
impl<K1, V1> Stream<RootCircuit, OrdIndexedZSet<K1, V1>>
where
K1: DBData,
V1: DBData,
{
/// Asof-join operator.
///
/// An asof-join operator combines records from two tables based on a common key
/// (similar to an equi-join), as well as a timestamp. It assumes that both tables
/// contain a timestamp column (`ts`). It matches each value `v` in `self` with
/// the value in `other` that has the same key and the largest timestamp not
/// exceeding `v.ts`. If there are multiple values with the same timestamp, the
/// operator picks the largest one based on the ordering (according to `Ord`) on
/// type `V2`. If there is no value `v2`, such that `v2.ts <= v.ts` in `other`,
/// then the value `None` is used, i.e., this operator behaves as a left join.
///
/// The operator assumes that values in both collections are sorted by timestamp,
/// i.e., `impl Ord for V1` must satisfy `ts_func1(v) < ts_func1(u) ==> v < u`.
/// Similarly for `V2`: `ts_func2(v) < ts_func2(u) ==> v < u`.
///
/// # Arguments
///
/// * `self` - the left-hand side of the join.
/// * `other` - the right-hand side of the join.
/// * `join` - join function that maps a key, a value from `self`, and an optional
/// value from `other` to an output value.
/// * `ts_func1` - extracts the value of the timestamp column from a record in `self`.
/// * `ts_func2` - extracts the value of the timestamp column from a record in `other`.
#[track_caller]
pub fn asof_join<TS, F, TSF1, TSF2, V2, V>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K1, V2>>,
join: F,
ts_func1: TSF1,
ts_func2: TSF2,
) -> Stream<RootCircuit, OrdZSet<V>>
where
TS: DBData,
V2: DBData,
V: DBData,
F: Fn(&K1, &V1, Option<&V2>) -> V + Clone + 'static,
TSF1: Fn(&V1) -> TS + Clone + 'static,
TSF2: Fn(&V2) -> TS + 'static,
{
let join_factories = AsofJoinFactories::new::<TS, K1, V1, V2, V, ()>();
let ts_func1_clone = ts_func1.clone();
let dyn_ts_func1 = Box::new(move |v: &DynData, ts: &mut DynData| unsafe {
*ts.downcast_mut() = ts_func1_clone(v.downcast())
});
let ts_func1_clone = ts_func1.clone();
let tscmp_func = Box::new(move |v1: &DynData, v2: &DynData| unsafe {
ts_func1_clone(v1.downcast()).cmp(&ts_func2(v2.downcast()))
});
let valts_cmp_func = Box::new(move |v1: &DynData, ts: &DynData| unsafe {
// println!("cmp {v1:?}, {ts:?}");
ts_func1(v1.downcast()).cmp(ts.downcast())
});
let join_func = Box::new(
move |key: &DynData,
v1: &DynData,
v2: Option<&DynData>,
cb: &mut dyn FnMut(&mut DynData, &mut DynUnit)| unsafe {
let mut v = join(key.downcast(), v1.downcast(), v2.map(|v| v.downcast()));
cb(v.erase_mut(), ().erase_mut());
},
);
self.inner()
.dyn_asof_join_mono(
&join_factories,
&other.inner(),
dyn_ts_func1,
tscmp_func,
valts_cmp_func,
join_func,
)
.typed()
}
}