1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#[derive(Clone)]
pub struct Aggregated<Agg: Aggregatable, Seq> {
	rollup_threshold: Option<Seq>,
	live: std::collections::HashMap<crate::Session, crate::Cell<Agg, Seq>>,
}

pub trait Aggregatable: Clone {
	fn join(this: &mut Self, that: Self);

	fn join_ref(this: &mut Self, that: &Self) {
		Self::join(this, that.clone())
	}

	/** Fix non-associative types.
	 *
	 * If `join` is not associative this method can be used to ensure that the same result is obtained irrespective of the join order. Note that this trait *can* be used even if `canonicalize(join(a, b))` is not identical to `canonicalize(join(b, a))` however it means that results will differ for different actors.
	 *
	 * This method will not necessarily be called after every join, it will only be called before returning the value to the client. If you need to do a canonicalization after every join do it in `join`.
	 */
	fn canonicalize(_: &mut Self) {}
}

impl<
	Agg: Aggregatable,
	SF: crate::SequenceFactory,
>
	Aggregated<Agg, crate::Sequence<SF>>
{
	pub fn update(&mut self, ctx: &mut crate::Context<SF>, value: Agg) {
		match self.live.entry(ctx.session()) {
			std::collections::hash_map::Entry::Occupied(entry) => {
				Agg::join(entry.into_mut().borrow_mut(ctx), value);
			}
			std::collections::hash_map::Entry::Vacant(entry) => {
				entry.insert(crate::Cell::new(ctx, value));
			}
		}
	}
}

impl<Agg: Aggregatable, Seq> Aggregated<Agg, Seq> {
	pub fn value(&self) -> Agg {
		let mut iter = self.live.iter();
		let mut value = (*iter.next().unwrap().1).clone();
		for (_, v) in iter {
			Agg::join_ref(&mut value, &*v);
		}
		Agg::canonicalize(&mut value);
		value
	}
}

impl<
	Agg: Aggregatable,
	SF: crate::SequenceFactory,
>
	crate::Mergable for Aggregated<Agg, crate::Sequence<SF>>
{
	type Diff = crate::Opaque<Self>;

	fn merge(&mut self, that: Self) {
		if self.rollup_threshold < that.rollup_threshold {
			self.rollup_threshold = that.rollup_threshold;
			let threshold = &self.rollup_threshold;
			self.live.retain(|_, v| Some(v.last_modified()) >= *threshold);
		}

		for (k, v) in that.live {
			if Some(v.last_modified()) < self.rollup_threshold { continue }

			match self.live.entry(k) {
				std::collections::hash_map::Entry::Occupied(entry) => {
					entry.into_mut().merge(v);
				}
				std::collections::hash_map::Entry::Vacant(entry) => {
					entry.insert(v);
				}
			}
		}
	}

	fn diff(&self, other: &Self) -> Self::Diff {
		let rollup_threshold = self.rollup_threshold.clone().max(other.rollup_threshold.clone());
		let mut live = std::collections::HashMap::new();
		for (session, v) in &self.live {
			if Some(v.last_modified()) < rollup_threshold { continue }
			live.insert(session.clone(), v.clone());
		}
		crate::Opaque(Self {
			rollup_threshold,
			live,
		})
	}

	fn apply(&mut self, that: Self::Diff) -> Result<(), crate::ApplyError> {
		let that = that.0;

		if self.rollup_threshold < that.rollup_threshold {
			self.rollup_threshold = that.rollup_threshold;
			let threshold = &self.rollup_threshold;
			self.live.retain(|_, v| Some(v.last_modified()) >= *threshold);
		}

		for (k, v) in that.live {
			if Some(v.last_modified()) < self.rollup_threshold { continue }

			match self.live.entry(k) {
				std::collections::hash_map::Entry::Occupied(entry) => {
					entry.into_mut().merge(v);
				}
				std::collections::hash_map::Entry::Vacant(entry) => {
					entry.insert(v);
				}
			}
		}

		Ok(())
	}
}

impl<Agg: Aggregatable + Default, SF: crate::SequenceFactory>
	Default for Aggregated<Agg, crate::Sequence<SF>>
{
	fn default() -> Self {
		Aggregated {
			rollup_threshold: Default::default(),
			live: Default::default(),
		}
	}
}

impl<Agg: Aggregatable + std::cmp::PartialEq, Seq> std::cmp::PartialEq for Aggregated<Agg, Seq> {
	fn eq(&self, that: &Self) -> bool {
		self.value() == that.value()
	}
}

impl<Agg: Aggregatable + std::fmt::Debug, Seq> std::fmt::Debug for Aggregated<Agg, Seq> {
	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
		self.value().fmt(f)
	}
}

#[test]
fn test() {
	let mut ctx_a = crate::Context::default();
	let mut agg_a = Aggregated::default();
	agg_a.update(&mut ctx_a, 5);

	let mut ctx_b = crate::Context::default();
	let mut agg_b = Aggregated::default();
	agg_b.update(&mut ctx_b, 7);

	let merged = crate::test::test_merge(&mut [agg_a, agg_b]);
	assert_eq!(merged.value(), 12);
}

impl Aggregatable for u8 {
	fn join(a: &mut u8, b: u8) {
		*a += b
	}
}