Skip to main content

mergable/
counter.rs

1/** A counter.
2 */
3#[derive(Debug,Clone)]
4pub struct Counter<V, Seq> {
5	total: V,
6	live: std::collections::HashMap<crate::Session, crate::Cell<V, Seq>>,
7	rollup_seq: Seq,
8}
9
10impl<
11	V,
12	SF: crate::SequenceFactory,
13> Counter<V, crate::Sequence<SF>>
14{
15	pub fn new(ctx: &mut crate::Context<SF>, value: V) -> Self where V: Clone {
16		let cell = crate::Cell::new(ctx, value.clone());
17		Counter {
18			total: value,
19			rollup_seq: crate::Sequence::zero(),
20			live: IntoIterator::into_iter([(ctx.session(), cell)]).collect()
21		}
22	}
23
24	/** Set this session's value
25	*
26	* This will overwrite any existing values.
27	*/
28	pub fn add(&mut self, ctx: &mut crate::Context<SF>, value: V)
29	where V: for <'a> std::ops::AddAssign<&'a V> + std::ops::AddAssign<V>
30	{
31		match self.live.entry(ctx.session()) {
32			std::collections::hash_map::Entry::Occupied(mut entry) => {
33				self.total += &value;
34				*entry.get_mut().borrow_mut(ctx) += value;
35			}
36			std::collections::hash_map::Entry::Vacant(entry) => {
37				self.total += &value;
38				entry.insert(crate::Cell::new(ctx, value));
39			}
40		}
41	}
42
43	pub fn total(&self) -> &V {
44		&self.total
45	}
46}
47
48impl<
49	V: Clone
50	+ for<'a> std::ops::AddAssign<&'a V>
51	+ for<'a> std::ops::SubAssign<&'a V>,
52	SF: crate::SequenceFactory,
53>
54	crate::Mergable for Counter<V, crate::Sequence<SF>>
55{
56	type Diff = CounterDiff<V, crate::Sequence<SF>>;
57	type Seq = crate::Sequence<SF>;
58
59	// fn merge(&mut self, that: Self) {
60	// 	let removed = that.removed.into_iter()
61	// 		.map(|(what, when)| Update::Del(what, None, when));
62	// 	let live = that.live.into_iter()
63	// 		.map(|(k, Value{sequence, value})| Update::Up(sequence, k, UpdateValue::New(value)));
64	// 	self.apply(removed.chain(live)).unwrap()
65	// }
66
67	fn diff(&self, that: &Self) -> Self::Diff {
68		let mut rollup = (self.rollup_seq != that.rollup_seq)
69			.then(|| (self.rollup_seq.clone(), self.total.clone()));
70
71		let mut updates = Vec::new();
72
73		for (session, v) in &self.live {
74			if let Some((_, ref mut total)) = rollup {
75				*total -= &**v;
76			}
77
78			if v.last_modified() < &that.rollup_seq { continue }
79
80			let update = match that.live.get(&session) {
81				Some(that_v) => {
82					let diff = v.diff(that_v);
83					if crate::Diff::is_empty(&diff) { continue }
84					Update::Up(diff)
85				}
86				None => Update::New(v.clone()),
87			};
88
89			updates.push((session.clone(), update))
90		}
91
92		CounterDiff { rollup, updates }
93	}
94
95	fn apply(&mut self, diff: Self::Diff) -> Result<(), crate::ApplyError> {
96		let CounterDiff{rollup, updates} = diff;
97		if let Some((cutoff, rollup)) = rollup {
98			if cutoff > self.rollup_seq {
99				self.total = rollup;
100				let total = &mut self.total;
101				self.live.retain(|_, v| {
102					if v.last_modified() <= &cutoff { return false }
103
104					*total += &**v;
105					true
106				});
107				self.rollup_seq = cutoff;
108			}
109		}
110
111		for (session, update) in updates {
112			match self.live.entry(session) {
113				std::collections::hash_map::Entry::Occupied(mut entry) => {
114					let v = entry.get_mut();
115
116					self.total -= &**v;
117					match update {
118						Update::New(new) => v.merge(new),
119						Update::Up(diff) => v.apply(diff)?,
120						Update::Del(_) => {
121							entry.remove();
122							continue
123						}
124					}
125					self.total += &**v;
126				}
127				std::collections::hash_map::Entry::Vacant(entry) => {
128					match update {
129						Update::New(new) => {
130							self.total += &*new;
131							entry.insert(new);
132						}
133						Update::Up(_) => {
134							return Err(crate::ApplyError::Missing(
135								"Target doesn't contain base value for diff.".into()))
136						}
137						Update::Del(_) => {}
138					}
139				}
140			}
141		}
142
143		Ok(())
144	}
145
146	fn clean(&mut self, cutoff: &Self::Seq) {
147		if self.rollup_seq >= *cutoff { return }
148
149		self.live.retain(|_, v| v.last_modified() > cutoff);
150		self.rollup_seq = cutoff.clone();
151	}
152}
153
154impl<
155	V: Default,
156	SF: crate::SequenceFactory
157> Default for Counter<V, crate::Sequence<SF>> {
158	fn default() -> Self {
159		Counter {
160			live: Default::default(),
161			rollup_seq: crate::Sequence::zero(),
162			total: Default::default(),
163		}
164	}
165}
166
167impl<V: PartialEq, Seq> PartialEq for Counter<V, Seq> {
168	fn eq(&self, other: &Self) -> bool {
169		return self.total == other.total
170	}
171}
172
173#[derive(Clone,Debug)]
174pub struct CounterDiff<V, Seq> {
175	rollup: Option<(Seq, V)>,
176	updates: Vec<(crate::Session, Update<crate::Cell<V, Seq>, crate::CellDiff<V, Seq>>)>,
177}
178
179#[derive(Clone,Debug)]
180enum Update<V, Diff> {
181	New(V),
182	Del(V),
183	Up(Diff),
184}
185
186impl<
187	V,
188	SF: crate::SequenceFactory,
189> crate::Diff for CounterDiff<V, crate::Sequence<SF>> {
190	fn is_empty(&self) -> bool {
191		self.rollup.is_none() && self.updates.is_empty()
192	}
193
194	fn revert(mut self) -> Result<Self, crate::RevertError> {
195		crate::map_in_place(&mut self.updates, |(session, update)| Ok((session, match update {
196			Update::New(v) => Update::Del(v),
197			Update::Del(v) => Update::New(v),
198			Update::Up(diff) => Update::Up(diff),
199		})))?;
200		Ok(self)
201	}
202}
203
204#[test]
205fn test_simple() {
206	let mut ctx1 = crate::Context::default();
207	let mut c1 = Counter::default();
208	c1.add(&mut ctx1, 5);
209	c1.add(&mut ctx1, 3);
210
211	let mut ctx2 = crate::Context::default();
212	let mut c2 = c1.clone();
213	c2.add(&mut ctx2, 1);
214	c2.add(&mut ctx2, 4);
215
216	c1.add(&mut ctx1, 8);
217
218	let result = crate::test::test_merge(&mut [&c2, &c1]);
219	assert_eq!(result.total(), &21);
220}
221
222#[test]
223fn test_revert() {
224	let mut ctx1 = crate::Context::default();
225	let mut c1 = Counter::default();
226	c1.add(&mut ctx1, 5);
227	c1.add(&mut ctx1, 3);
228
229	let mut ctx2 = crate::Context::default();
230	let mut c2 = c1.clone();
231	c2.add(&mut ctx2, 1);
232	c2.add(&mut ctx2, 4);
233
234	let diff = crate::Mergable::diff(&c1, &c2);
235	let revert = crate::Diff::revert(diff.clone()).unwrap();
236
237	let result = crate::test::test_apply(c1, &mut [diff, revert]);
238	assert_eq!(result.total(), &8);
239}
240
241#[test]
242fn test_revert_revive() {
243	let mut ctx1 = crate::Context::default();
244	let mut c1 = Counter::default();
245	c1.add(&mut ctx1, 5u32);
246	c1.add(&mut ctx1, 3);
247
248	let mut ctx2 = crate::Context::default();
249	let mut c2 = c1.clone();
250	c2.add(&mut ctx2, 1);
251	c2.add(&mut ctx2, 4);
252
253	let diff = crate::Mergable::diff(&c1, &c2);
254	let revert = crate::Diff::revert(diff.clone()).unwrap();
255
256	c2.add(&mut ctx2, 7);
257
258	let result = crate::test::test_apply(c2, &mut [diff, revert]);
259	// FIXME: Right now reverts to parts of sessions get ignored. Ideally this would be 15.
260	// We probably need to rethink how our revert system works.
261	// It would be possible to do it with the current revert system by creating a negative session for the revert but that would require "signed" types as well as the ability to generate a sequence number in the revert function.
262	assert_eq!(result.total(), &20);
263}
264
265#[test]
266fn test_clean() {
267	let mut ctx1 = crate::Context::default();
268	let mut c1 = Counter::default();
269	c1.add(&mut ctx1, 1);
270	c1.add(&mut ctx1, 2);
271
272	let mut ctx2 = crate::Context::default();
273	let mut c2 = c1.clone();
274	c2.add(&mut ctx2, 4);
275	c2.add(&mut ctx2, 8);
276
277	c1.add(&mut ctx1, 16);
278
279	let seq_pre_merge = ctx1.next_sequence().max(ctx2.next_sequence());
280
281	let mut c1 = crate::test::test_merge(&mut [&c1, &c2]);
282	assert_eq!(c1.total(), &31);
283
284	ctx1.new_session();
285	ctx2.new_session();
286
287	let mut c2 = c1.clone();
288	c2.add(&mut ctx2, 32);
289
290	c1.add(&mut ctx1, 64);
291
292	crate::Mergable::clean(&mut c1, &seq_pre_merge);
293	assert_eq!(c1.live.len(), 1);
294
295	let mut c1 = crate::test::test_merge(&mut [&c1, &c2]);
296	assert_eq!(c1.total(), &127);
297
298	let seq_final = ctx1.next_sequence().max(ctx2.next_sequence());
299	crate::Mergable::clean(&mut c1, &seq_final);
300	assert_eq!(c1.live.len(), 0);
301	assert_eq!(c1.total(), &127);
302}
303
304#[cfg(test)]
305#[derive(Debug,proptest_derive::Arbitrary)]
306enum Event {
307	#[proptest(weight=90)]
308	Add{
309		#[proptest(strategy="0..5usize")]
310		actor: usize,
311		new_session: bool,
312		value: std::num::Wrapping<i64>,
313	},
314	#[proptest(weight=9)]
315	Merge{
316		#[proptest(strategy="0..5usize")]
317		src: usize,
318		#[proptest(strategy="0..5usize")]
319		dst: usize,
320	},
321	#[proptest(weight=1)]
322	Clean,
323}
324
325#[cfg(test)]
326proptest::proptest!{
327	#[test]
328	fn proptest(events: Vec<Event>) {
329		let mut total = std::num::Wrapping::<i64>(0);
330
331		let mut actors = [(); 5].map(|()| {
332			let mut ctx = crate::Context::default();
333			let counter = Counter::new(&mut ctx, total);
334			(ctx, counter)
335		});
336
337		for event in events {
338			match event {
339				Event::Add{actor, new_session, value} => {
340					let (ctx, counter) = &mut actors[actor];
341					if new_session {
342						ctx.new_session();
343					}
344					total += value;
345					counter.add(ctx, value);
346				}
347				Event::Merge{src, dst} => {
348					if src == dst { continue }
349					let (src, dst) = crate::get_2_mut(&mut actors, src, dst);
350
351					crate::Mergable::merge(
352						&mut dst.1,
353						src.1.clone());
354				}
355				Event::Clean => {
356					for i in 1..actors.len() {
357						let (src, dst) = crate::get_2_mut(&mut actors, i-1, i);
358						crate::Mergable::merge(
359							&mut dst.1,
360							src.1.clone());
361					}
362					for i in (0..actors.len()).rev() {
363						if i > 0 {
364							let (dst, src) = crate::get_2_mut(&mut actors, i-1, i);
365							crate::Mergable::merge(
366								&mut dst.1,
367								src.1.clone());
368						}
369
370						let (ctx, counter) = &mut actors[i];
371						crate::Mergable::clean(counter, &ctx.next_sequence());
372					}
373				}
374			}
375		}
376
377		let mut refs = actors.iter().map(|(_, counter)| counter).collect::<Vec<_>>();
378		let merged = crate::test::test_merge(&mut refs);
379		proptest::prop_assert_eq!(merged.total(), &total);
380	}
381}