#ifndef CRDT_HPP
#define CRDT_HPP
#include <cstdint>
#ifndef CRDT_COLLECTIONS_DEFINED
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <map>
#include <set>
#include <vector>
template <typename T> using CrdtVector = std::vector<T>;
using CrdtKey = std::string;
template <typename K, typename V, typename Hash = std::hash<K>, typename KeyEqual = std::equal_to<K>>
using CrdtMap = std::unordered_map<K, V, Hash, KeyEqual>;
template <typename K, typename V, typename Comparator = std::less<K>> using CrdtSortedMap = std::map<K, V, Comparator>;
template <typename K, typename Hash = std::hash<K>, typename KeyEqual = std::equal_to<K>>
using CrdtSet = std::unordered_set<K, Hash, KeyEqual>;
template <typename K, typename V>
using CrdtTombstoneMap = std::unordered_map<K, V>;
template <typename T, typename Comparator> using CrdtSortedSet = std::set<T, Comparator>;
using CrdtNodeId = uint64_t;
#endif
#include <algorithm>
#include <iostream>
#include <optional>
#include <memory>
#include <type_traits>
#include <functional>
#include <variant>
template <typename T, typename = void> struct has_emplace_back : std::false_type {};
template <typename T>
struct has_emplace_back<T, std::void_t<decltype(std::declval<T>().emplace_back(std::declval<typename T::value_type>()))>>
: std::true_type {};
template <typename Container, typename Element> void add_to_container(Container &container, Element &&element) {
if constexpr (has_emplace_back<Container>::value) {
container.emplace_back(std::forward<Element>(element));
} else {
container.emplace(std::forward<Element>(element));
}
}
template <typename K, typename V> struct Change {
K record_id;
std::optional<CrdtKey> col_name; std::optional<V> value; uint64_t col_version;
uint64_t db_version;
CrdtNodeId node_id;
uint64_t local_db_version;
uint32_t flags;
Change() = default;
Change(K rid, std::optional<CrdtKey> cname, std::optional<V> val, uint64_t cver, uint64_t dver, CrdtNodeId nid,
uint64_t ldb_ver = 0, uint32_t f = 0)
: record_id(std::move(rid)), col_name(std::move(cname)), value(std::move(val)), col_version(cver), db_version(dver),
node_id(nid), local_db_version(ldb_ver), flags(f) {}
};
template <typename Rule, typename K, typename V, typename Context = void>
concept MergeRule =
(std::is_void_v<Context> &&
requires(Rule r, const Change<K, V> &local, const Change<K, V> &remote) {
{ r(local, remote) } -> std::convertible_to<bool>;
}) ||
(!std::is_void_v<Context> && requires(Rule r, const Change<K, V> &local, const Change<K, V> &remote, const Context &ctx) {
{ r(local, remote, ctx) } -> std::convertible_to<bool>;
});
template <typename K, typename V, typename Context = void> struct DefaultMergeRule {
constexpr bool operator()(uint64_t local_col, uint64_t local_db, const CrdtNodeId &local_node, uint64_t remote_col,
uint64_t remote_db, const CrdtNodeId &remote_node) const {
if (remote_col > local_col) {
return true;
} else if (remote_col < local_col) {
return false;
} else {
if (remote_db > local_db) {
return true;
} else if (remote_db < local_db) {
return false;
} else {
return (remote_node > local_node);
}
}
}
constexpr bool operator()(const Change<K, V> &local, const Change<K, V> &remote) const {
return (*this)(local.col_version, local.db_version, local.node_id, remote.col_version, remote.db_version, remote.node_id);
}
};
template <typename K, typename V, typename Context>
requires(!std::is_void_v<Context>)
struct DefaultMergeRule<K, V, Context> {
constexpr bool operator()(uint64_t local_col, uint64_t local_db, uint64_t local_node, uint64_t remote_col, uint64_t remote_db,
uint64_t remote_node, const Context &) const {
DefaultMergeRule<K, V, void> default_rule;
return default_rule(local_col, local_db, local_node, remote_col, remote_db, remote_node);
}
constexpr bool operator()(const Change<K, V> &local, const Change<K, V> &remote, const Context &ctx) const {
return (*this)(local.col_version, local.db_version, local.node_id, remote.col_version, remote.db_version, remote.node_id,
ctx);
}
};
template <typename Comparator, typename K, typename V>
concept ChangeComparator = requires(Comparator c, const Change<K, V> &a, const Change<K, V> &b) {
{ c(a, b) } -> std::convertible_to<bool>;
};
template <typename K, typename V> struct DefaultChangeComparator {
constexpr bool operator()(const Change<K, V> &a, const Change<K, V> &b) const {
if (a.record_id != b.record_id)
return a.record_id < b.record_id;
if (a.col_name.has_value() != b.col_name.has_value())
return b.col_name.has_value(); if (a.col_name != b.col_name)
return a.col_name < b.col_name;
if (a.col_version != b.col_version)
return a.col_version > b.col_version;
if (a.db_version != b.db_version)
return a.db_version > b.db_version;
if (a.node_id != b.node_id)
return a.node_id > b.node_id;
return false; }
};
struct DefaultSort {
template <typename Iterator, typename Comparator>
constexpr void operator()(Iterator begin, Iterator end, Comparator comp) const {
std::sort(begin, end, comp);
}
};
class LogicalClock {
public:
LogicalClock() : time_(0) {}
constexpr uint64_t tick() { return ++time_; }
constexpr uint64_t update(uint64_t received_time) {
time_ = std::max(time_, received_time);
return ++time_;
}
constexpr void set_time(uint64_t t) { time_ = t; }
constexpr uint64_t current_time() const { return time_; }
private:
uint64_t time_;
};
struct ColumnVersion {
uint64_t col_version;
uint64_t db_version;
CrdtNodeId node_id;
uint64_t local_db_version;
constexpr ColumnVersion(uint64_t c, uint64_t d, CrdtNodeId n, uint64_t ldb_ver = 0)
: col_version(c), db_version(d), node_id(n), local_db_version(ldb_ver) {}
};
struct TombstoneInfo {
uint64_t db_version;
CrdtNodeId node_id;
uint64_t local_db_version;
constexpr TombstoneInfo(uint64_t d, CrdtNodeId n, uint64_t ldb_ver)
: db_version(d), node_id(n), local_db_version(ldb_ver) {}
constexpr ColumnVersion as_column_version() const {
return ColumnVersion(UINT64_MAX, db_version, node_id, local_db_version);
}
};
template <typename K> struct CompactTombstoneEntry {
K first;
ColumnVersion second;
constexpr CompactTombstoneEntry(K id, ColumnVersion inf) : first(std::move(id)), second(inf) {}
};
template <typename K> class TombstoneStorage {
private:
CrdtTombstoneMap<K, TombstoneInfo> entries_;
public:
void insert_or_assign(const K &key, const TombstoneInfo &info) { entries_.insert_or_assign(key, info); }
std::optional<TombstoneInfo> find(const K &key) const {
auto it = entries_.find(key);
if (it != entries_.end()) {
return it->second;
}
return std::nullopt;
}
bool erase(const K &key) { return entries_.erase(key); }
void clear() { entries_.clear(); }
auto begin() const { return entries_.begin(); }
auto end() const { return entries_.end(); }
size_t size() const { return entries_.size(); }
size_t compact(uint64_t min_acknowledged_version) {
size_t removed_count = 0;
for (auto it = entries_.begin(); it != entries_.end();) {
if (it->second.db_version < min_acknowledged_version) {
it = entries_.erase(it);
++removed_count;
} else {
++it;
}
}
return removed_count;
}
};
template <typename V> struct Record {
CrdtMap<CrdtKey, V> fields;
CrdtMap<CrdtKey, ColumnVersion> column_versions;
uint64_t lowest_local_db_version = UINT64_MAX;
uint64_t highest_local_db_version = 0;
Record() = default;
Record(CrdtMap<CrdtKey, V> &&f, CrdtMap<CrdtKey, ColumnVersion> &&cv) : fields(std::move(f)), column_versions(std::move(cv)) {
for (const auto &[_, ver] : column_versions) {
if (ver.local_db_version < lowest_local_db_version) {
lowest_local_db_version = ver.local_db_version;
}
if (ver.local_db_version > highest_local_db_version) {
highest_local_db_version = ver.local_db_version;
}
}
}
};
template <typename V> constexpr bool operator==(const Record<V> &lhs, const Record<V> &rhs) {
if (lhs.fields.size() != rhs.fields.size())
return false;
for (const auto &[key, value] : lhs.fields) {
auto it = rhs.fields.find(key);
if (it == rhs.fields.end() || it->second != value)
return false;
}
return true;
}
template <typename Container, typename Key, typename Value>
concept MapLike = requires(Container c, Key k, Value v) {
typename Container::key_type;
typename Container::mapped_type;
typename Container::value_type;
typename Container::iterator;
{ c[k] } -> std::convertible_to<Value &>;
{ c.find(k) } -> std::convertible_to<typename Container::iterator>;
{ c.emplace(k, v) };
{ c.try_emplace(k, v) } -> std::same_as<std::pair<typename Container::iterator, bool>>;
{ c.insert_or_assign(k, v) } -> std::same_as<std::pair<typename Container::iterator, bool>>;
{ c.clear() } -> std::same_as<void>;
{ c.erase(k) };
};
template <typename K, typename V, typename SortFunctionType = DefaultSort, MapLike<K, Record<V>> MapType = CrdtMap<K, Record<V>>>
class CRDT : public std::enable_shared_from_this<CRDT<K, V, SortFunctionType, MapType>> {
protected:
CrdtNodeId node_id_;
LogicalClock clock_;
MapType data_;
TombstoneStorage<K> tombstones_;
std::shared_ptr<CRDT> parent_;
uint64_t base_version_; SortFunctionType sort_func_;
public:
CRDT(CrdtNodeId node_id, std::shared_ptr<CRDT> parent = nullptr, SortFunctionType sort_func = SortFunctionType())
: node_id_(node_id), clock_(), data_(), tombstones_(), parent_(parent), sort_func_(std::move(sort_func)) {
if (parent_) {
clock_ = parent_->clock_;
base_version_ = parent_->clock_.current_time();
} else {
base_version_ = 0;
}
}
constexpr CRDT(CrdtNodeId node_id, CrdtVector<Change<K, V>> &&changes) : node_id_(node_id), clock_(), data_(), tombstones_() {
apply_changes(std::move(changes));
}
constexpr void reset(CrdtVector<Change<K, V>> &&changes) {
data_.clear();
tombstones_.clear();
clock_ = LogicalClock();
apply_changes(std::move(changes));
}
constexpr CrdtVector<Change<K, V>> revert() {
if (!parent_) {
throw std::runtime_error("Cannot revert without a parent CRDT.");
}
CrdtVector<Change<K, V>> child_changes = this->get_changes_since(base_version_);
return invert_changes(child_changes, *parent_);
}
constexpr CrdtVector<Change<K, V>> diff(const CRDT &other) const {
uint64_t common_version = std::min(clock_.current_time(), other.clock_.current_time());
CrdtVector<Change<K, V>> this_changes = this->get_changes_since(common_version);
CrdtVector<Change<K, V>> other_changes = other.get_changes_since(common_version);
CrdtVector<Change<K, V>> inverted_this_changes = invert_changes(this_changes, other);
CrdtVector<Change<K, V>> diff_changes;
diff_changes.reserve(inverted_this_changes.size() + other_changes.size());
diff_changes.insert(diff_changes.end(), inverted_this_changes.begin(), inverted_this_changes.end());
diff_changes.insert(diff_changes.end(), other_changes.begin(), other_changes.end());
compress_changes(diff_changes);
return diff_changes;
}
template <typename... Pairs> constexpr void insert_or_update(const K &record_id, Pairs &&...pairs) {
insert_or_update_impl<CrdtVector<Change<K, V>>>(record_id, 0, nullptr, std::forward<Pairs>(pairs)...);
}
template <typename ChangeContainer, typename... Pairs>
constexpr void insert_or_update(const K &record_id, ChangeContainer &changes, Pairs &&...pairs) {
insert_or_update_impl(record_id, 0, &changes, std::forward<Pairs>(pairs)...);
}
template <typename... Pairs> constexpr void insert_or_update(const K &record_id, uint32_t flags, Pairs &&...pairs) {
insert_or_update_impl<CrdtVector<Change<K, V>>>(record_id, flags, nullptr, std::forward<Pairs>(pairs)...);
}
template <typename ChangeContainer, typename... Pairs>
constexpr void insert_or_update(const K &record_id, uint32_t flags, ChangeContainer &changes, Pairs &&...pairs) {
insert_or_update_impl(record_id, flags, &changes, std::forward<Pairs>(pairs)...);
}
template <typename Container> constexpr void insert_or_update_from_container(const K &record_id, Container &&fields) {
insert_or_update_from_container_impl<CrdtVector<Change<K, V>>>(record_id, 0, std::forward<Container>(fields), nullptr);
}
template <typename Container>
constexpr void insert_or_update_from_container(const K &record_id, uint32_t flags, Container &&fields) {
insert_or_update_from_container_impl<CrdtVector<Change<K, V>>>(record_id, flags, std::forward<Container>(fields), nullptr);
}
template <typename Container, typename ChangeContainer>
constexpr void insert_or_update_from_container(const K &record_id, Container &&fields, ChangeContainer &changes) {
insert_or_update_from_container_impl(record_id, 0, std::forward<Container>(fields), &changes);
}
template <typename Container, typename ChangeContainer>
constexpr void insert_or_update_from_container(const K &record_id, uint32_t flags, Container &&fields,
ChangeContainer &changes) {
insert_or_update_from_container_impl(record_id, flags, std::forward<Container>(fields), &changes);
}
virtual void delete_record(const K &record_id, uint32_t flags = 0) {
delete_record_impl<CrdtVector<Change<K, V>>>(record_id, flags, nullptr);
}
template <typename ChangeContainer> void delete_record(const K &record_id, ChangeContainer &changes, uint32_t flags = 0) {
delete_record_impl(record_id, flags, &changes);
}
virtual bool delete_field(const K &record_id, const CrdtKey &field_name, uint32_t flags = 0) {
return delete_field_impl<CrdtVector<Change<K, V>>>(record_id, field_name, flags, nullptr);
}
template <typename ChangeContainer>
bool delete_field(const K &record_id, const CrdtKey &field_name, ChangeContainer &changes, uint32_t flags = 0) {
return delete_field_impl(record_id, field_name, flags, &changes);
}
virtual CrdtVector<Change<K, V>> get_changes_since(uint64_t last_db_version, CrdtSet<CrdtNodeId> excluding = {}) const {
CrdtVector<Change<K, V>> changes;
if (parent_) {
auto parent_changes = parent_->get_changes_since(last_db_version);
changes.insert(changes.end(), parent_changes.begin(), parent_changes.end());
}
for (const auto &[record_id, record] : data_) {
if (record.highest_local_db_version <= last_db_version) {
continue;
}
for (const auto &[col_name, clock_info] : record.column_versions) {
if (clock_info.local_db_version > last_db_version && !excluding.contains(clock_info.node_id)) {
std::optional<V> value = std::nullopt;
std::optional<CrdtKey> name = col_name;
auto field_it = record.fields.find(col_name);
if (field_it != record.fields.end()) {
value = field_it->second;
}
changes.emplace_back(Change<K, V>(record_id, std::move(name), std::move(value), clock_info.col_version,
clock_info.db_version, clock_info.node_id, clock_info.local_db_version));
}
}
}
for (const auto &entry : tombstones_) {
const auto &record_id = entry.first;
const auto &tombstone_info = entry.second;
if (tombstone_info.local_db_version > last_db_version && !excluding.contains(tombstone_info.node_id)) {
changes.emplace_back(Change<K, V>(record_id, std::nullopt, std::nullopt, 1, tombstone_info.db_version,
tombstone_info.node_id, tombstone_info.local_db_version));
}
}
if (parent_) {
compress_changes(changes);
}
return changes;
}
template <bool ReturnAcceptedChanges = false, typename MergeContext = void,
MergeRule<K, V, MergeContext> MergeRuleType = DefaultMergeRule<K, V, MergeContext>>
std::conditional_t<ReturnAcceptedChanges, CrdtVector<Change<K, V>>, void>
merge_changes(CrdtVector<Change<K, V>> &&changes, bool ignore_parent = false, MergeRuleType merge_rule = MergeRuleType(),
std::conditional_t<std::is_void_v<MergeContext>,
std::monostate, MergeContext>
context = {}) {
CrdtVector<Change<K, V>> accepted_changes;
if (changes.empty()) {
if constexpr (ReturnAcceptedChanges) {
return accepted_changes;
} else {
return;
}
}
for (auto &&change : changes) {
const K &record_id = change.record_id;
std::optional<CrdtKey> col_name = std::move(change.col_name);
uint64_t remote_col_version = change.col_version;
uint64_t remote_db_version = change.db_version;
CrdtNodeId remote_node_id = change.node_id;
std::optional<V> remote_value = std::move(change.value);
uint32_t flags = change.flags;
uint64_t new_local_db_version = clock_.update(remote_db_version);
if (is_record_tombstoned(record_id, ignore_parent)) {
continue;
}
const Record<V> *record_ptr = get_record_ptr(record_id, ignore_parent);
const ColumnVersion *local_col_info = nullptr;
if (!col_name) {
if (auto tombstone_info = tombstones_.find(record_id)) {
static thread_local ColumnVersion temp_col_version{0, 0, {}, 0};
temp_col_version = tombstone_info->as_column_version();
local_col_info = &temp_col_version;
}
} else if (record_ptr != nullptr) {
auto col_it = record_ptr->column_versions.find(*col_name);
if (col_it != record_ptr->column_versions.end()) {
local_col_info = &col_it->second;
}
}
bool should_accept = false;
if (local_col_info == nullptr) {
should_accept = true;
} else {
if constexpr (std::is_void_v<MergeContext>) {
should_accept = merge_rule(local_col_info->col_version, local_col_info->db_version, local_col_info->node_id,
remote_col_version, remote_db_version, remote_node_id);
} else {
should_accept = merge_rule(local_col_info->col_version, local_col_info->db_version, local_col_info->node_id,
remote_col_version, remote_db_version, remote_node_id, context);
}
}
if (should_accept) {
if (!col_name) {
data_.erase(record_id);
tombstones_.insert_or_assign(
record_id, TombstoneInfo(remote_db_version, remote_node_id, new_local_db_version));
if constexpr (ReturnAcceptedChanges) {
accepted_changes.emplace_back(Change<K, V>(record_id, std::nullopt, std::nullopt, remote_col_version,
remote_db_version, remote_node_id, new_local_db_version, flags));
}
} else {
Record<V> &record = get_or_create_record_unchecked(record_id, ignore_parent);
if (remote_value.has_value()) {
if constexpr (ReturnAcceptedChanges) {
record.fields[*col_name] = *remote_value;
} else {
record.fields[*col_name] = std::move(*remote_value);
}
} else {
record.fields.erase(*col_name);
}
if constexpr (ReturnAcceptedChanges) {
record.column_versions.insert_or_assign(
*col_name, ColumnVersion(remote_col_version, remote_db_version, remote_node_id, new_local_db_version));
if (new_local_db_version < record.lowest_local_db_version) {
record.lowest_local_db_version = new_local_db_version;
}
if (new_local_db_version > record.highest_local_db_version) {
record.highest_local_db_version = new_local_db_version;
}
accepted_changes.emplace_back(Change<K, V>(record_id, std::move(col_name), std::move(remote_value),
remote_col_version, remote_db_version, remote_node_id,
new_local_db_version, flags));
} else {
record.column_versions.insert_or_assign(
std::move(*col_name), ColumnVersion(remote_col_version, remote_db_version, remote_node_id, new_local_db_version));
if (new_local_db_version < record.lowest_local_db_version) {
record.lowest_local_db_version = new_local_db_version;
}
if (new_local_db_version > record.highest_local_db_version) {
record.highest_local_db_version = new_local_db_version;
}
}
}
}
}
if constexpr (ReturnAcceptedChanges) {
return accepted_changes;
}
}
template <bool Sorted = false> static void compress_changes(CrdtVector<Change<K, V>> &changes) {
if (changes.empty())
return;
auto new_end = compress_changes<Sorted>(changes.begin(), changes.end());
changes.erase(new_end, changes.end());
}
template <bool Sorted = false, typename Iterator> static Iterator compress_changes(Iterator begin, Iterator end) {
if (begin == end)
return end;
if constexpr (!Sorted) {
SortFunctionType()(begin, end, DefaultChangeComparator<K, V>());
}
Iterator write = begin;
for (Iterator read = std::next(begin); read != end; ++read) {
if (read->record_id != write->record_id) {
++write;
if (write != read) {
*write = std::move(*read);
}
} else if (!read->col_name.has_value() && write->col_name.has_value()) {
*write = std::move(*read);
} else if (read->col_name != write->col_name && write->col_name.has_value()) {
++write;
if (write != read) {
*write = std::move(*read);
}
}
}
return std::next(write);
}
#ifndef NDEBUG
constexpr void print_data() const {
std::cout << "Node " << node_id_ << " Data:" << std::endl;
for (const auto &[record_id, record] : data_) {
std::cout << "ID: ";
print_value(record_id);
std::cout << std::endl;
for (const auto &[key, value] : record.fields) {
std::cout << " ";
print_value(key);
std::cout << ": ";
print_value(value);
std::cout << std::endl;
}
}
}
#else
constexpr void print_data() const {}
#endif
constexpr const LogicalClock &get_clock() const { return clock_; }
void update_max_clock(uint64_t new_clock) {
clock_.update(new_clock);
}
constexpr CrdtMap<K, Record<V>> get_data_combined() const {
if (!parent_) {
return data_;
}
CrdtMap<K, Record<V>> combined_data = parent_->get_data_combined();
for (const auto &entry : tombstones_) {
combined_data.erase(entry.first);
}
for (const auto &[key, record] : data_) {
combined_data[key] = record;
}
return combined_data;
}
constexpr auto &get_data() { return data_; }
constexpr Record<V> *get_record(const K &record_id, bool ignore_parent = false) {
return get_record_ptr(record_id, ignore_parent);
}
constexpr const Record<V> *get_record(const K &record_id, bool ignore_parent = false) const {
return get_record_ptr(record_id, ignore_parent);
}
constexpr bool is_tombstoned(const K &record_id, bool ignore_parent = false) const {
return is_record_tombstoned(record_id, ignore_parent);
}
constexpr std::optional<TombstoneInfo> get_tombstone(const K &record_id, bool ignore_parent = false) const {
if (auto tombstone_info = tombstones_.find(record_id)) {
return tombstone_info;
}
if (parent_ && !ignore_parent) {
return parent_->get_tombstone(record_id);
}
return std::nullopt;
}
size_t compact_tombstones(uint64_t min_acknowledged_version) {
return tombstones_.compact(min_acknowledged_version);
}
size_t tombstone_count() const {
return tombstones_.size();
}
template <typename Predicate> CrdtVector<std::pair<K, Record<V>>> query_records(Predicate &&pred) const {
CrdtVector<std::pair<K, Record<V>>> results;
for (const auto &[key, record] : data_) {
if (!is_tombstoned(key) && pred(key, record)) {
results.emplace_back(key, record);
}
}
return results;
}
template <typename Predicate, typename Projection> auto query_with_projection(Predicate &&pred, Projection &&proj) const {
using ResultType = std::invoke_result_t<Projection, K, Record<V>>;
CrdtVector<ResultType> results;
for (const auto &[key, record] : data_) {
if (!is_tombstoned(key) && pred(key, record)) {
results.push_back(proj(key, record));
}
}
return results;
}
CRDT(const CRDT &other)
: node_id_(other.node_id_), clock_(other.clock_), data_(other.data_), tombstones_(other.tombstones_),
parent_(other.parent_), base_version_(other.base_version_), sort_func_(other.sort_func_) {
}
CRDT &operator=(const CRDT &other) {
if (this != &other) {
node_id_ = other.node_id_;
clock_ = other.clock_;
data_ = other.data_;
tombstones_ = other.tombstones_;
parent_ = other.parent_;
base_version_ = other.base_version_;
sort_func_ = other.sort_func_;
}
return *this;
}
CRDT(CRDT &&other) noexcept
: node_id_(other.node_id_), clock_(std::move(other.clock_)), data_(std::move(other.data_)),
tombstones_(std::move(other.tombstones_)), parent_(std::move(other.parent_)), base_version_(other.base_version_),
sort_func_(std::move(other.sort_func_)) {}
CRDT &operator=(CRDT &&other) noexcept {
if (this != &other) {
node_id_ = other.node_id_;
clock_ = std::move(other.clock_);
data_ = std::move(other.data_);
tombstones_ = std::move(other.tombstones_);
parent_ = std::move(other.parent_);
base_version_ = other.base_version_;
sort_func_ = std::move(other.sort_func_);
}
return *this;
}
protected:
template <typename T> static void print_value(const T &value) {
if constexpr (std::is_same_v<T, std::string> || std::is_arithmetic_v<T>) {
std::cout << value;
} else {
std::cout << "[non-printable]";
}
}
void apply_changes(CrdtVector<Change<K, V>> &&changes) {
uint64_t max_db_version = 0;
for (const auto &change : changes) {
if (change.db_version > max_db_version) {
max_db_version = change.db_version;
}
if (change.local_db_version > max_db_version) {
max_db_version = change.local_db_version;
}
}
clock_.set_time(max_db_version);
for (auto &&change : changes) {
const K &record_id = change.record_id;
std::optional<CrdtKey> col_name = std::move(change.col_name);
uint64_t remote_col_version = change.col_version;
uint64_t remote_db_version = change.db_version;
CrdtNodeId remote_node_id = change.node_id;
uint64_t remote_local_db_version = change.local_db_version;
std::optional<V> remote_value = std::move(change.value);
if (!col_name.has_value()) {
data_.erase(record_id);
tombstones_.insert_or_assign(
record_id, TombstoneInfo(remote_db_version, remote_node_id, remote_local_db_version));
} else {
if (!is_record_tombstoned(record_id)) {
Record<V> &record = get_or_create_record_unchecked(record_id);
if (remote_value.has_value()) {
record.fields[*col_name] = std::move(remote_value.value());
}
record.column_versions.insert_or_assign(std::move(*col_name), ColumnVersion(remote_col_version, remote_db_version,
remote_node_id, remote_local_db_version));
if (remote_local_db_version < record.lowest_local_db_version) {
record.lowest_local_db_version = remote_local_db_version;
}
if (remote_local_db_version > record.highest_local_db_version) {
record.highest_local_db_version = remote_local_db_version;
}
}
}
}
}
constexpr bool is_record_tombstoned(const K &record_id, bool ignore_parent = false) const {
if (tombstones_.find(record_id).has_value()) {
return true;
}
if (parent_ && !ignore_parent) {
return parent_->is_record_tombstoned(record_id);
}
return false;
}
constexpr Record<V> &get_or_create_record_unchecked(const K &record_id, bool ignore_parent = false) {
auto [it, inserted] = data_.try_emplace(record_id, Record<V>());
if (inserted && parent_ && !ignore_parent) {
if (auto parent_record = parent_->get_record_ptr(record_id)) {
it->second = *parent_record;
}
}
return it->second;
}
constexpr Record<V> *get_record_ptr(const K &record_id, bool ignore_parent = false) {
auto it = data_.find(record_id);
if (it != data_.end()) {
return &(it->second);
}
if (ignore_parent) {
return nullptr;
} else {
return parent_ ? parent_->get_record_ptr(record_id) : nullptr;
}
}
constexpr const Record<V> *get_record_ptr(const K &record_id, bool ignore_parent = false) const {
auto it = data_.find(record_id);
if (it != data_.end()) {
return &(it->second);
}
if (ignore_parent) {
return nullptr;
} else {
return parent_ ? parent_->get_record_ptr(record_id) : nullptr;
}
}
CrdtVector<Change<K, V>> invert_changes(const CrdtVector<Change<K, V>> &changes, const CRDT &reference_crdt) const {
CrdtVector<Change<K, V>> inverse_changes;
for (const auto &change : changes) {
const K &record_id = change.record_id;
const std::optional<CrdtKey> &col_name = change.col_name;
if (!col_name.has_value()) {
auto record_ptr = reference_crdt.get_record(record_id);
if (record_ptr) {
std::vector<std::pair<CrdtKey, V>> sorted_fields(record_ptr->fields.begin(), record_ptr->fields.end());
std::sort(sorted_fields.begin(), sorted_fields.end(), [&](const auto &a, const auto &b) {
return record_ptr->column_versions.at(a.first).db_version < record_ptr->column_versions.at(b.first).db_version;
});
for (const auto &[ref_col, ref_val] : sorted_fields) {
inverse_changes.emplace_back(Change<K, V>(record_id, ref_col, ref_val,
record_ptr->column_versions.at(ref_col).col_version,
record_ptr->column_versions.at(ref_col).db_version, node_id_,
record_ptr->column_versions.at(ref_col).local_db_version));
}
}
} else {
CrdtKey col = *col_name;
auto record_ptr = reference_crdt.get_record(record_id);
if (record_ptr) {
auto field_it = record_ptr->fields.find(col);
if (field_it != record_ptr->fields.end()) {
inverse_changes.emplace_back(Change<K, V>(
record_id, col, field_it->second, record_ptr->column_versions.at(col).col_version,
record_ptr->column_versions.at(col).db_version, node_id_, record_ptr->column_versions.at(col).local_db_version));
} else {
inverse_changes.emplace_back(Change<K, V>(record_id, col,
std::nullopt, 0, clock_.current_time(), node_id_));
}
} else {
inverse_changes.emplace_back(Change<K, V>(record_id, std::nullopt, std::nullopt,
0, clock_.current_time(), node_id_));
}
}
}
return inverse_changes;
}
template <typename ChangeContainer, typename... Pairs>
constexpr void insert_or_update_impl(const K &record_id, uint32_t flags, ChangeContainer *changes, Pairs &&...pairs) {
uint64_t db_version = clock_.tick();
if (is_record_tombstoned(record_id)) {
return;
}
Record<V> &record = get_or_create_record_unchecked(record_id);
auto process_pair = [&](const auto &pair) {
const auto &col_name = pair.first;
const auto &value = pair.second;
uint64_t col_version;
auto col_it = record.column_versions.find(col_name);
if (col_it != record.column_versions.end()) {
col_version = ++col_it->second.col_version;
col_it->second.db_version = db_version;
col_it->second.node_id = node_id_;
col_it->second.local_db_version = db_version;
} else {
col_version = 1;
record.column_versions.emplace(col_name, ColumnVersion(col_version, db_version, node_id_, db_version));
}
if (db_version < record.lowest_local_db_version) {
record.lowest_local_db_version = db_version;
}
if (db_version > record.highest_local_db_version) {
record.highest_local_db_version = db_version;
}
if (changes) {
record.fields[col_name] = value;
add_to_container(*changes, Change<K, V>(record_id, std::move(col_name), std::move(value), col_version, db_version,
node_id_, db_version, flags));
} else {
record.fields[std::move(col_name)] = std::move(value);
}
};
(process_pair(std::forward<Pairs>(pairs)), ...);
}
template <typename ChangeContainer, typename Container>
constexpr void insert_or_update_from_container_impl(const K &record_id, uint32_t flags, Container &&fields,
ChangeContainer *changes) {
uint64_t db_version = clock_.tick();
if (is_record_tombstoned(record_id)) {
return;
}
Record<V> &record = get_or_create_record_unchecked(record_id);
for (auto &&[col_name, value] : std::forward<Container>(fields)) {
uint64_t col_version;
auto col_it = record.column_versions.find(col_name);
if (col_it != record.column_versions.end()) {
col_version = ++col_it->second.col_version;
col_it->second.db_version = db_version;
col_it->second.node_id = node_id_;
col_it->second.local_db_version = db_version;
} else {
col_version = 1;
record.column_versions.emplace(col_name, ColumnVersion(col_version, db_version, node_id_, db_version));
}
if (db_version < record.lowest_local_db_version) {
record.lowest_local_db_version = db_version;
}
if (db_version > record.highest_local_db_version) {
record.highest_local_db_version = db_version;
}
if (changes) {
record.fields[col_name] = value;
add_to_container(*changes, Change<K, V>(record_id, std::move(col_name), std::move(value), col_version, db_version,
node_id_, db_version, flags));
} else {
record.fields[std::move(col_name)] = std::move(value);
}
}
}
template <typename ChangeContainer> void delete_record_impl(const K &record_id, uint32_t flags, ChangeContainer *changes) {
if (is_record_tombstoned(record_id)) {
return;
}
uint64_t db_version = clock_.tick();
data_.erase(record_id);
tombstones_.insert_or_assign(record_id, TombstoneInfo(db_version, node_id_, db_version));
if (changes) {
add_to_container(*changes, Change<K, V>(record_id, std::nullopt, std::nullopt, 1, db_version, node_id_, db_version, flags));
}
}
template <typename ChangeContainer>
bool delete_field_impl(const K &record_id, const CrdtKey &field_name, uint32_t flags, ChangeContainer *changes) {
if (is_record_tombstoned(record_id)) {
return false;
}
auto it = data_.find(record_id);
if (it == data_.end()) {
return false;
}
Record<V> &record = it->second;
if (record.fields.find(field_name) == record.fields.end()) {
return false;
}
uint64_t db_version = clock_.tick();
uint64_t col_version;
auto col_it = record.column_versions.find(field_name);
if (col_it != record.column_versions.end()) {
col_version = ++col_it->second.col_version;
col_it->second.db_version = db_version;
col_it->second.node_id = node_id_;
col_it->second.local_db_version = db_version;
} else {
col_version = 1;
record.column_versions.emplace(field_name, ColumnVersion(col_version, db_version, node_id_, db_version));
}
if (db_version < record.lowest_local_db_version) {
record.lowest_local_db_version = db_version;
}
if (db_version > record.highest_local_db_version) {
record.highest_local_db_version = db_version;
}
record.fields.erase(field_name);
if (changes) {
add_to_container(*changes, Change<K, V>(record_id, field_name, std::nullopt, col_version, db_version, node_id_, db_version, flags));
}
return true;
}
};
template <typename K, typename V, typename SortFunctionType = DefaultSort, MapLike<K, Record<V>> MapType = CrdtMap<K, Record<V>>,
typename MergeContext = void, MergeRule<K, V, MergeContext> MergeRuleType = DefaultMergeRule<K, V, MergeContext>>
constexpr void sync_nodes(CRDT<K, V, SortFunctionType, MapType> &source, CRDT<K, V, SortFunctionType, MapType> &target,
uint64_t &last_db_version, MergeRuleType merge_rule = MergeRuleType(),
std::conditional_t<std::is_void_v<MergeContext>,
std::monostate, MergeContext>
context = {}) {
auto changes = source.get_changes_since(last_db_version);
uint64_t max_version = last_db_version;
for (const auto &change : changes) {
if (change.db_version > max_version) {
max_version = change.db_version;
}
}
if (max_version > last_db_version) {
last_db_version = max_version;
}
target.merge_changes(std::move(changes), false, merge_rule, context);
}
#endif