1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#[derive(Clone)]
pub struct Aggregated<Agg: Aggregatable, Seq> {
rollup_threshold: Option<Seq>,
live: std::collections::HashMap<crate::Session, crate::Cell<Agg, Seq>>,
}
pub trait Aggregatable: Clone {
fn join(this: &mut Self, that: Self);
fn join_ref(this: &mut Self, that: &Self) {
Self::join(this, that.clone())
}
fn canonicalize(_: &mut Self) {}
}
impl<
Agg: Aggregatable,
SF: crate::SequenceFactory,
>
Aggregated<Agg, crate::Sequence<SF>>
{
pub fn update(&mut self, ctx: &mut crate::Context<SF>, value: Agg) {
match self.live.entry(ctx.session()) {
std::collections::hash_map::Entry::Occupied(entry) => {
Agg::join(entry.into_mut().borrow_mut(ctx), value);
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(crate::Cell::new(ctx, value));
}
}
}
}
impl<Agg: Aggregatable, Seq> Aggregated<Agg, Seq> {
pub fn value(&self) -> Agg {
let mut iter = self.live.iter();
let mut value = (*iter.next().unwrap().1).clone();
for (_, v) in iter {
Agg::join_ref(&mut value, &*v);
}
Agg::canonicalize(&mut value);
value
}
}
impl<
Agg: Aggregatable,
SF: crate::SequenceFactory,
>
crate::Mergable for Aggregated<Agg, crate::Sequence<SF>>
{
type Diff = crate::Opaque<Self>;
fn merge(&mut self, that: Self) {
if self.rollup_threshold < that.rollup_threshold {
self.rollup_threshold = that.rollup_threshold;
let threshold = &self.rollup_threshold;
self.live.retain(|_, v| Some(v.last_modified()) >= *threshold);
}
for (k, v) in that.live {
if Some(v.last_modified()) < self.rollup_threshold { continue }
match self.live.entry(k) {
std::collections::hash_map::Entry::Occupied(entry) => {
entry.into_mut().merge(v);
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(v);
}
}
}
}
fn diff(&self, other: &Self) -> Self::Diff {
let rollup_threshold = self.rollup_threshold.clone().max(other.rollup_threshold.clone());
let mut live = std::collections::HashMap::new();
for (session, v) in &self.live {
if Some(v.last_modified()) < rollup_threshold { continue }
live.insert(session.clone(), v.clone());
}
crate::Opaque(Self {
rollup_threshold,
live,
})
}
fn apply(&mut self, that: Self::Diff) -> Result<(), crate::ApplyError> {
let that = that.0;
if self.rollup_threshold < that.rollup_threshold {
self.rollup_threshold = that.rollup_threshold;
let threshold = &self.rollup_threshold;
self.live.retain(|_, v| Some(v.last_modified()) >= *threshold);
}
for (k, v) in that.live {
if Some(v.last_modified()) < self.rollup_threshold { continue }
match self.live.entry(k) {
std::collections::hash_map::Entry::Occupied(entry) => {
entry.into_mut().merge(v);
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(v);
}
}
}
Ok(())
}
}
impl<Agg: Aggregatable + Default, SF: crate::SequenceFactory>
Default for Aggregated<Agg, crate::Sequence<SF>>
{
fn default() -> Self {
Aggregated {
rollup_threshold: Default::default(),
live: Default::default(),
}
}
}
impl<Agg: Aggregatable + std::cmp::PartialEq, Seq> std::cmp::PartialEq for Aggregated<Agg, Seq> {
fn eq(&self, that: &Self) -> bool {
self.value() == that.value()
}
}
impl<Agg: Aggregatable + std::fmt::Debug, Seq> std::fmt::Debug for Aggregated<Agg, Seq> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
self.value().fmt(f)
}
}
#[test]
fn test() {
let mut ctx_a = crate::Context::default();
let mut agg_a = Aggregated::default();
agg_a.update(&mut ctx_a, 5);
let mut ctx_b = crate::Context::default();
let mut agg_b = Aggregated::default();
agg_b.update(&mut ctx_b, 7);
let merged = crate::test::test_merge(&mut [agg_a, agg_b]);
assert_eq!(merged.value(), 12);
}
impl Aggregatable for u8 {
fn join(a: &mut u8, b: u8) {
*a += b
}
}