uni_plugin/traits/crdt.rs
1//! CRDT kind plugins.
2
3pub use datafusion::scalar::ScalarValue;
4use smol_str::SmolStr;
5
6use crate::errors::FnError;
7
8/// Identifier for a CRDT kind (`"lww-register"`, `"or-set"`, …).
9#[derive(Clone, Debug, PartialEq, Eq, Hash)]
10pub struct CrdtKind(pub SmolStr);
11
12impl CrdtKind {
13 /// Construct a `CrdtKind` from a string.
14 #[must_use]
15 pub fn new(s: impl Into<SmolStr>) -> Self {
16 Self(s.into())
17 }
18}
19
20/// Opaque CRDT operation payload — encoding is CRDT-kind-specific.
21#[derive(Clone, Debug)]
22pub struct CrdtOp {
23 /// Raw operation bytes.
24 pub bytes: Vec<u8>,
25}
26
27/// A CRDT-kind provider.
28pub trait CrdtKindProvider: Send + Sync {
29 /// The CRDT kind this provider implements.
30 fn kind(&self) -> CrdtKind;
31
32 /// Construct an empty state.
33 fn empty(&self) -> Box<dyn CrdtState>;
34
35 /// Restore state from persisted bytes.
36 ///
37 /// # Errors
38 ///
39 /// Returns [`FnError`] if the bytes cannot be deserialized.
40 #[allow(
41 clippy::wrong_self_convention,
42 reason = "method belongs to the provider, not the persisted bytes"
43 )]
44 fn from_persisted(&self, bytes: &[u8]) -> Result<Box<dyn CrdtState>, FnError>;
45
46 /// Reject a reload that would tear in-flight CRDT merge state.
47 ///
48 /// Default implementation round-trips an empty `old` state through
49 /// `self.from_persisted()`. Providers that store private schema
50 /// metadata (version stamps, replica id widths) should override and
51 /// emit a richer compat check.
52 ///
53 /// A CRDT hot-swap requires that bytes produced by the **old** provider's
54 /// `persist()` are still readable by the **new** provider's
55 /// `from_persisted()`. Failing this check is a hard reload error.
56 ///
57 /// # Errors
58 ///
59 /// Returns [`FnError`] if the new provider rejects the old
60 /// provider's persisted shape.
61 fn schema_compat_check(&self, old: &dyn CrdtKindProvider) -> Result<(), FnError> {
62 let empty_old = old.empty();
63 let bytes = empty_old.persist()?;
64 self.from_persisted(&bytes).map(|_| ())
65 }
66}
67
68/// Per-instance CRDT state.
69///
70/// `'static` is required so [`CrdtState::as_any`] can downcast safely in
71/// `merge` implementations to access the concrete other-state.
72pub trait CrdtState: Send + Sync + 'static {
73 /// Return `&dyn Any` for safe downcasting in `merge` implementations.
74 ///
75 /// Implementations should expose `as_any` with the one-liner
76 /// `fn as_any(&self) -> &dyn std::any::Any { self }`.
77 fn as_any(&self) -> &dyn std::any::Any;
78
79 /// Apply an operation to this state.
80 ///
81 /// # Errors
82 ///
83 /// Returns [`FnError`] on application failure.
84 fn apply(&mut self, op: &CrdtOp) -> Result<(), FnError>;
85
86 /// Merge `other` into `self` (must be associative + commutative).
87 ///
88 /// # Errors
89 ///
90 /// Returns [`FnError`] on merge failure.
91 fn merge(&mut self, other: &dyn CrdtState) -> Result<(), FnError>;
92
93 /// Query the current logical value.
94 ///
95 /// # Errors
96 ///
97 /// Returns [`FnError`] if the value cannot be computed.
98 fn value(&self) -> Result<ScalarValue, FnError>;
99
100 /// Serialize for persistence.
101 ///
102 /// # Errors
103 ///
104 /// Returns [`FnError`] on serialization failure.
105 fn persist(&self) -> Result<Vec<u8>, FnError>;
106}