1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use std::cmp::Ordering;

use serde::{Deserialize, Serialize};

use crate::time::now_msec;

use crate::crdt::crdt::*;

/// Last Write Win (LWW)
///
/// An LWW CRDT associates a timestamp with a value, in order to implement a
/// time-based reconciliation rule: the most recent write wins.
/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs
/// with the same timestamp but different values.
///
/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must
/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to
/// keep, the merge rule of the inner CRDT is applied on the wrapped values.  (Note that all types
/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value.
/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is
/// generally desirable in this case to never explicitly produce LWW values with the same timestamp
/// but different inner values, as the rule to keep the maximum value isn't generally the desired
/// semantics.)
///
/// As multiple computers clocks are always desynchronized,
/// when operations are close enough, it is equivalent to
/// take one copy and drop the other one.
///
/// Given that clocks are not too desynchronized, this assumption
/// is enough for most cases, as there is few chance that two humans
/// coordonate themself faster than the time difference between two NTP servers.
///
/// As a more concret example, let's suppose you want to upload a file
/// with the same key (path) in the same bucket at the very same time.
/// For each request, the file will be timestamped by the receiving server
/// and may differ from what you observed with your atomic clock!
///
/// This scheme is used by AWS S3 or Soundcloud and often without knowing
/// in enterprise when reconciliating databases with ad-hoc scripts.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct Lww<T> {
	ts: u64,
	v: T,
}

impl<T> Lww<T>
where
	T: Crdt,
{
	/// Creates a new CRDT
	///
	/// CRDT's internal timestamp is set with current node's clock.
	pub fn new(value: T) -> Self {
		Self {
			ts: now_msec(),
			v: value,
		}
	}

	/// Build a new CRDT from a previous non-compatible one
	///
	/// Compared to new, the CRDT's timestamp is not set to now
	/// but must be set to the previous, non-compatible, CRDT's timestamp.
	pub fn migrate_from_raw(ts: u64, value: T) -> Self {
		Self { ts, v: value }
	}

	/// Update the LWW CRDT while keeping some causal ordering.
	///
	/// The timestamp of the LWW CRDT is updated to be the current node's clock
	/// at time of update, or the previous timestamp + 1 if that's bigger,
	/// so that the new timestamp is always strictly larger than the previous one.
	/// This ensures that merging the update with the old value will result in keeping
	/// the updated value.
	pub fn update(&mut self, new_value: T) {
		self.ts = std::cmp::max(self.ts + 1, now_msec());
		self.v = new_value;
	}

	/// Get the timestamp currently associated with the value
	pub fn timestamp(&self) -> u64 {
		self.ts
	}

	/// Get the CRDT value
	pub fn get(&self) -> &T {
		&self.v
	}

	/// Take the CRDT value
	pub fn take(self) -> T {
		self.v
	}

	/// Get a mutable reference to the CRDT's value
	///
	/// This is usefull to mutate the inside value without changing the LWW timestamp.
	/// When such mutation is done, the merge between two LWW values is done using the inner
	/// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large
	/// data type, such as a map, and we only want to change a single item in the map.
	/// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification.
	/// This delta consists in a LWW with the same timestamp, and the map
	/// inside only contains the updated value.
	/// The advantage of such a delta is that it is much smaller than the whole map.
	///
	/// Avoid using this if the inner data type is a primitive type such as a number or a string,
	/// as you will then rely on the merge function defined on `Ord` types by keeping the maximum
	/// of both values.
	pub fn get_mut(&mut self) -> &mut T {
		&mut self.v
	}
}

impl<T> Crdt for Lww<T>
where
	T: Clone + Crdt,
{
	fn merge(&mut self, other: &Self) {
		match other.ts.cmp(&self.ts) {
			Ordering::Greater => {
				self.ts = other.ts;
				self.v = other.v.clone();
			}
			Ordering::Equal => {
				self.v.merge(&other.v);
			}
			Ordering::Less => (),
		}
	}
}