Skip to main content

uni_plugin/traits/
crdt.rs

1//! CRDT kind plugins.
2
3pub use datafusion::scalar::ScalarValue;
4use smol_str::SmolStr;
5
6use crate::errors::FnError;
7
8/// Identifier for a CRDT kind (`"lww-register"`, `"or-set"`, …).
9#[derive(Clone, Debug, PartialEq, Eq, Hash)]
10pub struct CrdtKind(pub SmolStr);
11
12impl CrdtKind {
13    /// Construct a `CrdtKind` from a string.
14    #[must_use]
15    pub fn new(s: impl Into<SmolStr>) -> Self {
16        Self(s.into())
17    }
18}
19
20/// Opaque CRDT operation payload — encoding is CRDT-kind-specific.
21#[derive(Clone, Debug)]
22pub struct CrdtOp {
23    /// Raw operation bytes.
24    pub bytes: Vec<u8>,
25}
26
27/// A CRDT-kind provider.
28pub trait CrdtKindProvider: Send + Sync {
29    /// The CRDT kind this provider implements.
30    fn kind(&self) -> CrdtKind;
31
32    /// Construct an empty state.
33    fn empty(&self) -> Box<dyn CrdtState>;
34
35    /// Restore state from persisted bytes.
36    ///
37    /// # Errors
38    ///
39    /// Returns [`FnError`] if the bytes cannot be deserialized.
40    #[allow(
41        clippy::wrong_self_convention,
42        reason = "method belongs to the provider, not the persisted bytes"
43    )]
44    fn from_persisted(&self, bytes: &[u8]) -> Result<Box<dyn CrdtState>, FnError>;
45
46    /// Reject a reload that would tear in-flight CRDT merge state.
47    ///
48    /// Default implementation round-trips an empty `old` state through
49    /// `self.from_persisted()`. Providers that store private schema
50    /// metadata (version stamps, replica id widths) should override and
51    /// emit a richer compat check.
52    ///
53    /// A CRDT hot-swap requires that bytes produced by the **old** provider's
54    /// `persist()` are still readable by the **new** provider's
55    /// `from_persisted()`. Failing this check is a hard reload error.
56    ///
57    /// # Errors
58    ///
59    /// Returns [`FnError`] if the new provider rejects the old
60    /// provider's persisted shape.
61    fn schema_compat_check(&self, old: &dyn CrdtKindProvider) -> Result<(), FnError> {
62        let empty_old = old.empty();
63        let bytes = empty_old.persist()?;
64        self.from_persisted(&bytes).map(|_| ())
65    }
66}
67
68/// Per-instance CRDT state.
69///
70/// `'static` is required so [`CrdtState::as_any`] can downcast safely in
71/// `merge` implementations to access the concrete other-state.
72pub trait CrdtState: Send + Sync + 'static {
73    /// Return `&dyn Any` for safe downcasting in `merge` implementations.
74    ///
75    /// Implementations should expose `as_any` with the one-liner
76    /// `fn as_any(&self) -> &dyn std::any::Any { self }`.
77    fn as_any(&self) -> &dyn std::any::Any;
78
79    /// Apply an operation to this state.
80    ///
81    /// # Errors
82    ///
83    /// Returns [`FnError`] on application failure.
84    fn apply(&mut self, op: &CrdtOp) -> Result<(), FnError>;
85
86    /// Merge `other` into `self` (must be associative + commutative).
87    ///
88    /// # Errors
89    ///
90    /// Returns [`FnError`] on merge failure.
91    fn merge(&mut self, other: &dyn CrdtState) -> Result<(), FnError>;
92
93    /// Query the current logical value.
94    ///
95    /// # Errors
96    ///
97    /// Returns [`FnError`] if the value cannot be computed.
98    fn value(&self) -> Result<ScalarValue, FnError>;
99
100    /// Serialize for persistence.
101    ///
102    /// # Errors
103    ///
104    /// Returns [`FnError`] on serialization failure.
105    fn persist(&self) -> Result<Vec<u8>, FnError>;
106}