use std::sync::Arc;
use uni_crdt::registry_dispatch::op_from_bytes;
use uni_crdt::{Crdt, GCounter, ORSet};
use uni_plugin::traits::crdt::{CrdtKind, CrdtKindProvider, CrdtOp, CrdtState, ScalarValue};
use uni_plugin::{Capability, CapabilitySet, FnError, PluginId, PluginRegistrar, PluginRegistry};
struct NativeGCounterProvider;
impl CrdtKindProvider for NativeGCounterProvider {
fn kind(&self) -> CrdtKind {
CrdtKind::new("uni-crdt:g-counter")
}
fn empty(&self) -> Box<dyn CrdtState> {
Box::new(NativeCrdtState {
inner: Crdt::GCounter(GCounter::new()),
})
}
fn from_persisted(&self, bytes: &[u8]) -> Result<Box<dyn CrdtState>, FnError> {
let inner = Crdt::from_msgpack(bytes)
.map_err(|e| FnError::new(0x900, format!("g-counter from_persisted: {e}")))?;
Ok(Box::new(NativeCrdtState { inner }))
}
}
struct NativeOrSetProvider;
impl CrdtKindProvider for NativeOrSetProvider {
fn kind(&self) -> CrdtKind {
CrdtKind::new("uni-crdt:or-set")
}
fn empty(&self) -> Box<dyn CrdtState> {
Box::new(NativeCrdtState {
inner: Crdt::ORSet(ORSet::new()),
})
}
fn from_persisted(&self, bytes: &[u8]) -> Result<Box<dyn CrdtState>, FnError> {
let inner = Crdt::from_msgpack(bytes)
.map_err(|e| FnError::new(0x901, format!("or-set from_persisted: {e}")))?;
Ok(Box::new(NativeCrdtState { inner }))
}
}
struct NativeCrdtState {
inner: Crdt,
}
impl CrdtState for NativeCrdtState {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn apply(&mut self, _op: &CrdtOp) -> Result<(), FnError> {
Ok(())
}
fn merge(&mut self, other: &dyn CrdtState) -> Result<(), FnError> {
let other = other
.as_any()
.downcast_ref::<NativeCrdtState>()
.ok_or_else(|| FnError::new(0x910, "native merge: state type mismatch"))?;
self.inner
.try_merge(&other.inner)
.map_err(|e| FnError::new(0x911, format!("native merge: {e}")))
}
fn value(&self) -> Result<ScalarValue, FnError> {
Ok(ScalarValue::Utf8(Some(self.inner.type_name().to_owned())))
}
fn persist(&self) -> Result<Vec<u8>, FnError> {
self.inner
.to_msgpack()
.map_err(|e| FnError::new(0x912, format!("native persist: {e}")))
}
}
fn register_native_providers(registry: &PluginRegistry) {
let caps = CapabilitySet::from_iter_of([Capability::Crdt]);
let mut r = PluginRegistrar::new(PluginId::new("uni-crdt.native"), &caps, registry);
r.crdt_kind(
CrdtKind::new("uni-crdt:g-counter"),
Arc::new(NativeGCounterProvider),
)
.unwrap();
r.crdt_kind(
CrdtKind::new("uni-crdt:or-set"),
Arc::new(NativeOrSetProvider),
)
.unwrap();
r.commit_to_registry().unwrap();
}
#[test]
fn registry_dispatch_g_counter_merges_through_provider() {
let registry = PluginRegistry::new();
register_native_providers(®istry);
let mut a = GCounter::new();
a.increment("r1", 5);
a.increment("r2", 2);
let mut b = GCounter::new();
b.increment("r2", 10);
b.increment("r3", 4);
let mut expected = Crdt::GCounter(a.clone());
expected.try_merge(&Crdt::GCounter(b.clone())).unwrap();
let mut actual = Crdt::GCounter(a);
actual
.merge_via_registry(&Crdt::GCounter(b), ®istry)
.expect("registry-dispatched merge");
assert_eq!(actual, expected);
match actual {
Crdt::GCounter(c) => assert_eq!(c.value(), 5 + 10 + 4),
other => panic!("expected GCounter, got {other:?}"),
}
}
#[test]
fn registry_dispatch_or_set_merges_through_provider() {
let registry = PluginRegistry::new();
register_native_providers(®istry);
let mut a = ORSet::<String>::new();
a.add("apple".to_owned());
a.add("banana".to_owned());
let mut b = ORSet::<String>::new();
b.add("cherry".to_owned());
let mut expected = Crdt::ORSet(a.clone());
expected.try_merge(&Crdt::ORSet(b.clone())).unwrap();
let mut actual = Crdt::ORSet(a);
actual
.merge_via_registry(&Crdt::ORSet(b), ®istry)
.expect("registry-dispatched merge");
assert_eq!(actual, expected);
match actual {
Crdt::ORSet(s) => {
let mut elts = s.elements();
elts.sort();
assert_eq!(elts, vec!["apple", "banana", "cherry"]);
}
other => panic!("expected ORSet, got {other:?}"),
}
}
#[test]
fn registry_dispatch_falls_back_when_no_provider_registered() {
let registry = PluginRegistry::new();
let mut a = GCounter::new();
a.increment("r1", 3);
let mut b = GCounter::new();
b.increment("r2", 7);
let mut x = Crdt::GCounter(a);
x.merge_via_registry(&Crdt::GCounter(b), ®istry)
.expect("native fallback");
match x {
Crdt::GCounter(c) => assert_eq!(c.value(), 10),
other => panic!("expected GCounter, got {other:?}"),
}
}
#[test]
fn op_helper_wraps_bytes_into_crdt_op() {
let op = op_from_bytes(vec![1, 2, 3]);
assert_eq!(op.bytes, vec![1, 2, 3]);
}