Skip to main content

flaron_sdk/
plasma.rs

1//! Plasma - cross-edge CRDT key/value store.
2//!
3//! Plasma replicates state across every edge in the Flaron mesh using a
4//! gossip-backed CRDT layer. Use it for state that must be visible from any
5//! edge: counters, presence, leaderboards, feature flags, ephemeral session
6//! coordination. For per-site state with TTLs use [`crate::spark`] instead.
7//!
8//! ## Operations
9//!
10//! * [`get`] / [`set`] / [`delete`] - last-writer-wins register semantics.
11//! * [`increment`] / [`decrement`] - PN-counter CRDT, returns the new value.
12//! * [`list`] - enumerate all keys for this site.
13//!
14//! ## Capability gate
15//!
16//! Writes (`set`, `delete`, `increment`, `decrement`) require the flare's
17//! `WritesPlasmaKV` capability. Without it, every write returns
18//! [`PlasmaError::NoCapability`].
19
20use crate::{ffi, mem};
21
22/// Errors returned by Plasma write operations.
23///
24/// Codes match the `plasmaErr*` constants in
25/// `internal/corona/hostapi_kv.go`.
26#[derive(Debug, thiserror::Error)]
27pub enum PlasmaError {
28    /// Plasma is not configured on this edge node.
29    #[error("plasma: not available")]
30    NotAvailable,
31
32    /// Per-invocation write count exceeded.
33    #[error("plasma: write limit exceeded")]
34    WriteLimit,
35
36    /// Value exceeds the host's per-key size cap.
37    #[error("plasma: value too large")]
38    TooLarge,
39
40    /// Key failed validation.
41    #[error("plasma: invalid key")]
42    BadKey,
43
44    /// Flare lacks the `WritesPlasmaKV` capability.
45    #[error("plasma: no capability")]
46    NoCapability,
47
48    /// Internal host error - see edge logs.
49    #[error("plasma: internal error")]
50    Internal,
51
52    /// Unknown error code returned by the host.
53    #[error("plasma: unknown error code {0}")]
54    Unknown(i32),
55}
56
57impl PlasmaError {
58    fn from_code(code: i32) -> Self {
59        match code {
60            1 => Self::NotAvailable,
61            2 => Self::WriteLimit,
62            3 => Self::TooLarge,
63            4 => Self::BadKey,
64            5 => Self::NoCapability,
65            6 => Self::Internal,
66            other => Self::Unknown(other),
67        }
68    }
69}
70
71/// Get a value from Plasma.
72///
73/// Returns `None` if the key does not exist, the read limit was hit, or
74/// Plasma is not configured on this edge.
75pub fn get(key: &str) -> Option<Vec<u8>> {
76    let (key_ptr, key_len) = mem::host_arg_str(key);
77    let result = unsafe { ffi::plasma_get(key_ptr, key_len) };
78    // SAFETY: host writes the raw value bytes into the bump arena.
79    unsafe { mem::read_packed_bytes(result) }
80}
81
82/// Convenience: get a value as a UTF-8 string. Returns `None` if the key is
83/// missing or the bytes are not valid UTF-8.
84pub fn get_string(key: &str) -> Option<String> {
85    let bytes = get(key)?;
86    String::from_utf8(bytes).ok()
87}
88
89/// Write a value to Plasma. Last-writer-wins.
90///
91/// Requires `WritesPlasmaKV` capability.
92pub fn set(key: &str, value: &[u8]) -> Result<(), PlasmaError> {
93    let (key_ptr, key_len) = mem::host_arg_str(key);
94    let (val_ptr, val_len) = mem::host_arg_bytes(value);
95    let code = unsafe { ffi::plasma_set(key_ptr, key_len, val_ptr, val_len) };
96    if code == 0 {
97        Ok(())
98    } else {
99        Err(PlasmaError::from_code(code))
100    }
101}
102
103/// Delete a key from Plasma. Idempotent.
104///
105/// Requires `WritesPlasmaKV` capability.
106pub fn delete(key: &str) -> Result<(), PlasmaError> {
107    let (key_ptr, key_len) = mem::host_arg_str(key);
108    let code = unsafe { ffi::plasma_delete(key_ptr, key_len) };
109    if code == 0 {
110        Ok(())
111    } else {
112        Err(PlasmaError::from_code(code))
113    }
114}
115
116/// Atomically add `delta` to a PN-counter at `key` and return the new value.
117///
118/// Counters are CRDTs - concurrent increments from multiple edges merge
119/// commutatively without coordination. Pass a negative delta to decrement,
120/// or use [`decrement`] for clarity.
121///
122/// Returns `None` if the operation failed (capability missing, write limit,
123/// host error).
124pub fn increment(key: &str, delta: i64) -> Option<i64> {
125    let (key_ptr, key_len) = mem::host_arg_str(key);
126    let result = unsafe { ffi::plasma_increment(key_ptr, key_len, delta) };
127    // SAFETY: host writes 8 LE bytes representing the new counter value.
128    let bytes = unsafe { mem::read_packed_bytes(result) }?;
129    if bytes.len() != 8 {
130        return None;
131    }
132    let arr: [u8; 8] = [
133        bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
134    ];
135    Some(i64::from_le_bytes(arr))
136}
137
138/// Atomically subtract `delta` from a PN-counter at `key` and return the new
139/// value.
140pub fn decrement(key: &str, delta: i64) -> Option<i64> {
141    let (key_ptr, key_len) = mem::host_arg_str(key);
142    let result = unsafe { ffi::plasma_decrement(key_ptr, key_len, delta) };
143    let bytes = unsafe { mem::read_packed_bytes(result) }?;
144    if bytes.len() != 8 {
145        return None;
146    }
147    let arr: [u8; 8] = [
148        bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
149    ];
150    Some(i64::from_le_bytes(arr))
151}
152
153/// List all keys in this site's Plasma store.
154///
155/// Returns an empty `Vec` if Plasma is not configured or the read limit was
156/// hit.
157pub fn list() -> Vec<String> {
158    let result = unsafe { ffi::plasma_list() };
159    let Some(json_bytes) = (unsafe { mem::read_packed_bytes(result) }) else {
160        return Vec::new();
161    };
162    serde_json::from_slice(&json_bytes).unwrap_or_default()
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use crate::ffi::test_host;
169
170    #[test]
171    fn get_returns_stored_value() {
172        test_host::reset();
173        test_host::with_mock(|m| {
174            m.plasma_store.insert("k".into(), b"value".to_vec());
175        });
176        assert_eq!(get("k"), Some(b"value".to_vec()));
177    }
178
179    #[test]
180    fn get_none_for_missing() {
181        test_host::reset();
182        assert!(get("missing").is_none());
183    }
184
185    #[test]
186    fn get_string_decodes_utf8() {
187        test_host::reset();
188        test_host::with_mock(|m| {
189            m.plasma_store
190                .insert("k".into(), "héllo".as_bytes().to_vec());
191        });
192        assert_eq!(get_string("k").as_deref(), Some("héllo"));
193    }
194
195    #[test]
196    fn set_stores_value() {
197        test_host::reset();
198        set("k", b"v").expect("set should succeed");
199        assert_eq!(
200            test_host::read_mock(|m| m.plasma_store.get("k").cloned()),
201            Some(b"v".to_vec())
202        );
203    }
204
205    #[test]
206    fn set_maps_error_codes() {
207        for (code, expected_disc) in [
208            (1, PlasmaError::NotAvailable),
209            (2, PlasmaError::WriteLimit),
210            (3, PlasmaError::TooLarge),
211            (4, PlasmaError::BadKey),
212            (5, PlasmaError::NoCapability),
213            (6, PlasmaError::Internal),
214        ] {
215            test_host::reset();
216            test_host::with_mock(|m| m.plasma_set_error = code);
217            let err = set("k", b"v").unwrap_err();
218            assert!(
219                std::mem::discriminant(&err) == std::mem::discriminant(&expected_disc),
220                "code {} should map to {:?}, got {:?}",
221                code,
222                expected_disc,
223                err,
224            );
225        }
226    }
227
228    #[test]
229    fn set_unknown_error_code() {
230        test_host::reset();
231        test_host::with_mock(|m| m.plasma_set_error = 42);
232        match set("k", b"v").unwrap_err() {
233            PlasmaError::Unknown(42) => {}
234            other => panic!("expected Unknown(42), got {:?}", other),
235        }
236    }
237
238    #[test]
239    fn delete_removes_value() {
240        test_host::reset();
241        test_host::with_mock(|m| {
242            m.plasma_store.insert("k".into(), b"v".to_vec());
243        });
244        delete("k").unwrap();
245        assert!(test_host::read_mock(|m| m.plasma_store.is_empty()));
246    }
247
248    #[test]
249    fn increment_returns_new_counter_value() {
250        test_host::reset();
251        let v1 = increment("c", 5).expect("first increment");
252        assert_eq!(v1, 5);
253        let v2 = increment("c", 3).expect("second increment");
254        assert_eq!(v2, 8);
255        let v3 = increment("c", -2).expect("negative delta");
256        assert_eq!(v3, 6);
257    }
258
259    #[test]
260    fn decrement_returns_new_counter_value() {
261        test_host::reset();
262        increment("c", 10).unwrap();
263        let v = decrement("c", 4).expect("decrement");
264        assert_eq!(v, 6);
265    }
266
267    #[test]
268    fn increment_captures_args() {
269        test_host::reset();
270        increment("counter", 7).unwrap();
271        let captured = test_host::read_mock(|m| m.last_plasma_increment.clone());
272        assert_eq!(captured, Some(("counter".into(), 7)));
273    }
274
275    #[test]
276    fn increment_returns_none_on_error() {
277        test_host::reset();
278        test_host::with_mock(|m| m.plasma_increment_error = true);
279        assert!(increment("c", 1).is_none());
280    }
281
282    #[test]
283    fn list_returns_keys() {
284        test_host::reset();
285        test_host::with_mock(|m| {
286            m.plasma_store.insert("a".into(), b"1".to_vec());
287            m.plasma_store.insert("b".into(), b"2".to_vec());
288        });
289        let mut keys = list();
290        keys.sort();
291        assert_eq!(keys, vec!["a".to_string(), "b".to_string()]);
292    }
293}