Skip to main content

flaron_sdk/
spark.rs

1//! Spark - per-site KV store with TTL, persisted to disk on the edge.
2//!
3//! Spark gives a flare a fast local key/value store scoped to its domain.
4//! Writes hit the local disk; reads are pure in-memory. Use it for things
5//! like edge-side rate limiters, short-lived session blobs, response caches,
6//! or anything that wants TTL semantics and doesn't need cross-edge
7//! consistency. For cross-edge state, use [`crate::plasma`] instead.
8//!
9//! ## Wire format note
10//!
11//! `spark_get` returns a 4-byte little-endian `u32` TTL prefix followed by
12//! the value bytes. The SDK strips the prefix for you and returns a
13//! [`SparkEntry`] with the parsed `ttl_secs` plus the raw value. A TTL of
14//! `0` means "no expiry".
15//!
16//! ## Capability gate
17//!
18//! Writes (`set`, `delete`, `pull`) require the flare's `WritesSparkKV`
19//! capability - set it on the flare config in the dashboard. Without it,
20//! writes return [`SparkError::NoCapability`].
21
22use crate::{ffi, mem};
23
24/// A Spark entry returned by [`get`].
25#[derive(Debug, Clone)]
26pub struct SparkEntry {
27    /// The raw stored value.
28    pub value: Vec<u8>,
29
30    /// Remaining TTL in seconds. `0` means the entry was stored with no
31    /// expiry.
32    pub ttl_secs: u32,
33}
34
35/// Errors returned by Spark write operations.
36///
37/// The numeric codes below are stable across SDK versions and match the
38/// `sparkErr*` constants in `internal/corona/hostapi_spark.go`.
39#[derive(Debug, thiserror::Error)]
40pub enum SparkError {
41    /// TTL value is invalid (negative, or above the host's per-flare cap).
42    #[error("spark: invalid TTL")]
43    InvalidTtl,
44
45    /// Value exceeds the host's per-key size cap (`max_kv_value_bytes`,
46    /// default 64 KiB).
47    #[error("spark: value too large")]
48    TooLarge,
49
50    /// Per-invocation write count exceeded.
51    #[error("spark: write limit exceeded")]
52    WriteLimit,
53
54    /// On-disk quota for this site is full.
55    #[error("spark: disk quota exceeded")]
56    QuotaExceeded,
57
58    /// Spark is not configured on this edge.
59    #[error("spark: not available")]
60    NotAvailable,
61
62    /// Internal host error - see edge logs for details.
63    #[error("spark: internal error")]
64    Internal,
65
66    /// Per-invocation read count exceeded. (Not currently returned by the
67    /// host but reserved in the protocol.)
68    #[error("spark: read limit exceeded")]
69    ReadLimit,
70
71    /// Key failed validation (must match `[a-zA-Z0-9:._-]{1,256}` and not
72    /// start with `__flaron:` or `__sys:`).
73    #[error("spark: invalid key")]
74    BadKey,
75
76    /// Flare lacks the `WritesSparkKV` capability for this operation.
77    #[error("spark: no capability")]
78    NoCapability,
79
80    /// Unknown error code returned by the host.
81    #[error("spark: unknown error code {0}")]
82    Unknown(i32),
83}
84
85impl SparkError {
86    fn from_code(code: i32) -> Self {
87        match code {
88            1 => Self::InvalidTtl,
89            2 => Self::TooLarge,
90            3 => Self::WriteLimit,
91            4 => Self::QuotaExceeded,
92            5 => Self::NotAvailable,
93            6 => Self::Internal,
94            7 => Self::ReadLimit,
95            8 => Self::BadKey,
96            9 => Self::NoCapability,
97            other => Self::Unknown(other),
98        }
99    }
100}
101
102/// Get a value from Spark.
103///
104/// Returns `None` if the key does not exist, the read limit was hit, or
105/// Spark is not configured. Use [`get_string`] when you only need a UTF-8
106/// payload.
107pub fn get(key: &str) -> Option<SparkEntry> {
108    let (key_ptr, key_len) = mem::host_arg_str(key);
109    let result = unsafe { ffi::spark_get(key_ptr, key_len) };
110    if result == 0 {
111        return None;
112    }
113    let (ptr, len) = mem::decode_ptr_len(result);
114    if len < 4 {
115        // Malformed payload - host always prefixes with 4-byte TTL.
116        return None;
117    }
118    // SAFETY: host writes 4-byte LE TTL prefix + value bytes into the arena.
119    let bytes = unsafe { mem::read_bytes(ptr, len) };
120    let ttl_secs = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
121    let value = bytes[4..].to_vec();
122    Some(SparkEntry { value, ttl_secs })
123}
124
125/// Convenience: get a value and deserialise it as a UTF-8 string.
126///
127/// Returns `None` if the key is missing or the value bytes are not valid
128/// UTF-8.
129pub fn get_string(key: &str) -> Option<String> {
130    let entry = get(key)?;
131    String::from_utf8(entry.value).ok()
132}
133
134/// Write a value to Spark with the given TTL in seconds.
135///
136/// Pass `0` for no expiry. Requires `WritesSparkKV` capability.
137pub fn set(key: &str, value: &[u8], ttl_secs: u32) -> Result<(), SparkError> {
138    let (key_ptr, key_len) = mem::host_arg_str(key);
139    let (val_ptr, val_len) = mem::host_arg_bytes(value);
140    let code = unsafe { ffi::spark_set(key_ptr, key_len, val_ptr, val_len, ttl_secs as i32) };
141    if code == 0 {
142        Ok(())
143    } else {
144        Err(SparkError::from_code(code))
145    }
146}
147
148/// Delete a key from Spark. No-op if the key does not exist.
149///
150/// Requires `WritesSparkKV` capability. The host returns nothing - failures
151/// are silent (logged on the edge node, not surfaced to the flare).
152pub fn delete(key: &str) {
153    let (key_ptr, key_len) = mem::host_arg_str(key);
154    unsafe { ffi::spark_delete(key_ptr, key_len) }
155}
156
157/// List all keys in this site's Spark store.
158///
159/// Returns an empty `Vec` if Spark is not configured or the read limit was
160/// hit.
161pub fn list() -> Vec<String> {
162    let result = unsafe { ffi::spark_list() };
163    // SAFETY: host writes a JSON array of key strings into the bump arena.
164    let Some(json_bytes) = (unsafe { mem::read_packed_bytes(result) }) else {
165        return Vec::new();
166    };
167    serde_json::from_slice(&json_bytes).unwrap_or_default()
168}
169
170/// Errors returned by [`pull`].
171#[derive(Debug, thiserror::Error)]
172pub enum SparkPullError {
173    /// Spark or the edge registry is not configured on this node.
174    #[error("spark pull: not available")]
175    NotAvailable,
176
177    /// Internal host error - see edge logs.
178    #[error("spark pull: internal error")]
179    Internal,
180
181    /// Flare lacks the `WritesSparkKV` capability.
182    #[error("spark pull: no capability")]
183    NoCapability,
184
185    /// One of the keys (or the origin node ID) failed validation.
186    #[error("spark pull: invalid key or origin")]
187    BadKey,
188
189    /// Per-invocation pull limit reached (`max_spark_pull_per_invocation`,
190    /// currently 1).
191    #[error("spark pull: rate limited")]
192    WriteLimit,
193
194    /// Unknown error code returned by the host.
195    #[error("spark pull: unknown error code {0}")]
196    Unknown(i32),
197}
198
199impl SparkPullError {
200    /// Map a positive `sparkErr*` numeric code (as defined in
201    /// `internal/corona/hostapi_spark.go`) to a typed error variant.
202    ///
203    /// Callers receiving the host's signed return value must negate it before
204    /// calling this - see [`pull`].
205    fn from_code(code: i32) -> Self {
206        match code {
207            3 => Self::WriteLimit,
208            5 => Self::NotAvailable,
209            6 => Self::Internal,
210            8 => Self::BadKey,
211            9 => Self::NoCapability,
212            other => Self::Unknown(other),
213        }
214    }
215}
216
217/// Migrate keys from another edge node into this one's Spark store.
218///
219/// `origin_node` is the target node's ID (as known to the edge registry).
220/// `keys` is the list of keys to migrate from that node into this site.
221///
222/// On success returns the number of keys actually migrated. The host rate
223/// limits this to one pull per invocation; subsequent calls return
224/// [`SparkPullError::WriteLimit`].
225///
226/// ## Wire convention
227///
228/// The host returns a signed `i32`:
229///
230/// - `>= 0` -> success, value is the count of migrated keys.
231/// - `< 0`  -> error, the absolute value is the matching `sparkErr*` code.
232pub fn pull(origin_node: &str, keys: &[&str]) -> Result<u32, SparkPullError> {
233    let keys_json = serde_json::to_string(keys).unwrap_or_else(|_| String::from("[]"));
234    let (origin_ptr, origin_len) = mem::host_arg_str(origin_node);
235    let (keys_ptr, keys_len) = mem::host_arg_str(&keys_json);
236    let code = unsafe { ffi::spark_pull(origin_ptr, origin_len, keys_ptr, keys_len) };
237    if code >= 0 {
238        Ok(code as u32)
239    } else {
240        Err(SparkPullError::from_code(-code))
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use crate::ffi::test_host;
248
249    #[test]
250    fn get_strips_ttl_prefix_and_returns_value() {
251        test_host::reset();
252        test_host::with_mock(|m| {
253            m.spark_store.insert("k".into(), (b"hello".to_vec(), 60));
254        });
255        let entry = get("k").expect("get should hit the store");
256        assert_eq!(entry.value, b"hello");
257        assert_eq!(entry.ttl_secs, 60);
258    }
259
260    #[test]
261    fn get_handles_zero_ttl_no_expiry() {
262        test_host::reset();
263        test_host::with_mock(|m| {
264            m.spark_store.insert("k".into(), (b"forever".to_vec(), 0));
265        });
266        let entry = get("k").unwrap();
267        assert_eq!(entry.ttl_secs, 0);
268        assert_eq!(entry.value, b"forever");
269    }
270
271    #[test]
272    fn get_returns_none_for_missing_key() {
273        test_host::reset();
274        assert!(get("missing").is_none());
275    }
276
277    #[test]
278    fn get_string_decodes_utf8() {
279        test_host::reset();
280        test_host::with_mock(|m| {
281            m.spark_store
282                .insert("k".into(), ("héllo".as_bytes().to_vec(), 30));
283        });
284        assert_eq!(get_string("k").as_deref(), Some("héllo"));
285    }
286
287    #[test]
288    fn get_string_returns_none_for_invalid_utf8() {
289        test_host::reset();
290        test_host::with_mock(|m| {
291            m.spark_store.insert("k".into(), (vec![0xff, 0xfe], 30));
292        });
293        assert!(get_string("k").is_none());
294    }
295
296    #[test]
297    fn set_writes_to_store() {
298        test_host::reset();
299        set("greeting", b"hi", 120).expect("set should succeed");
300        let stored = test_host::read_mock(|m| m.spark_store.get("greeting").cloned());
301        assert_eq!(stored, Some((b"hi".to_vec(), 120)));
302    }
303
304    #[test]
305    fn set_captures_args() {
306        test_host::reset();
307        set("k", b"v", 30).unwrap();
308        let captured = test_host::read_mock(|m| m.last_spark_set.clone());
309        assert_eq!(captured, Some(("k".into(), b"v".to_vec(), 30)));
310    }
311
312    #[test]
313    fn set_maps_error_codes() {
314        let cases = [
315            (1, SparkError::InvalidTtl),
316            (2, SparkError::TooLarge),
317            (3, SparkError::WriteLimit),
318            (4, SparkError::QuotaExceeded),
319            (5, SparkError::NotAvailable),
320            (6, SparkError::Internal),
321            (7, SparkError::ReadLimit),
322            (8, SparkError::BadKey),
323            (9, SparkError::NoCapability),
324        ];
325        for (code, expected) in cases {
326            test_host::reset();
327            test_host::with_mock(|m| m.spark_set_error = code);
328            let err = set("k", b"v", 30).unwrap_err();
329            assert!(
330                std::mem::discriminant(&err) == std::mem::discriminant(&expected),
331                "code {} should map to {:?}, got {:?}",
332                code,
333                expected,
334                err,
335            );
336        }
337    }
338
339    #[test]
340    fn set_unknown_error_code() {
341        test_host::reset();
342        test_host::with_mock(|m| m.spark_set_error = 99);
343        match set("k", b"v", 30).unwrap_err() {
344            SparkError::Unknown(99) => {}
345            other => panic!("expected Unknown(99), got {:?}", other),
346        }
347    }
348
349    #[test]
350    fn delete_removes_from_store() {
351        test_host::reset();
352        test_host::with_mock(|m| {
353            m.spark_store.insert("k".into(), (b"v".to_vec(), 60));
354        });
355        delete("k");
356        assert!(test_host::read_mock(|m| m.spark_store.is_empty()));
357        assert_eq!(test_host::read_mock(|m| m.spark_deletes.clone()), vec!["k"]);
358    }
359
360    #[test]
361    fn list_returns_keys() {
362        test_host::reset();
363        test_host::with_mock(|m| {
364            m.spark_store.insert("a".into(), (b"1".to_vec(), 10));
365            m.spark_store.insert("b".into(), (b"2".to_vec(), 20));
366        });
367        let mut keys = list();
368        keys.sort();
369        assert_eq!(keys, vec!["a".to_string(), "b".to_string()]);
370    }
371
372    #[test]
373    fn list_empty_when_no_keys() {
374        test_host::reset();
375        assert!(list().is_empty());
376    }
377
378    #[test]
379    fn pull_serializes_keys_as_json() {
380        test_host::reset();
381        test_host::with_mock(|m| m.spark_pull_result = 3);
382        let count = pull("origin-node", &["a", "b", "c"]).unwrap();
383        assert_eq!(count, 3);
384        let calls = test_host::read_mock(|m| m.spark_pull_calls.clone());
385        assert_eq!(calls.len(), 1);
386        assert_eq!(calls[0].0, "origin-node");
387        assert_eq!(calls[0].1, r#"["a","b","c"]"#);
388    }
389
390    #[test]
391    fn pull_zero_count_is_ok() {
392        test_host::reset();
393        test_host::with_mock(|m| m.spark_pull_result = 0);
394        assert_eq!(pull("o", &[]).unwrap(), 0);
395    }
396
397    #[test]
398    fn pull_error_from_code_mapping() {
399        match SparkPullError::from_code(3) {
400            SparkPullError::WriteLimit => {}
401            other => panic!("3 should map to WriteLimit, got {:?}", other),
402        }
403        match SparkPullError::from_code(5) {
404            SparkPullError::NotAvailable => {}
405            other => panic!("5 should map to NotAvailable, got {:?}", other),
406        }
407        match SparkPullError::from_code(6) {
408            SparkPullError::Internal => {}
409            other => panic!("6 should map to Internal, got {:?}", other),
410        }
411        match SparkPullError::from_code(8) {
412            SparkPullError::BadKey => {}
413            other => panic!("8 should map to BadKey, got {:?}", other),
414        }
415        match SparkPullError::from_code(9) {
416            SparkPullError::NoCapability => {}
417            other => panic!("9 should map to NoCapability, got {:?}", other),
418        }
419        match SparkPullError::from_code(99) {
420            SparkPullError::Unknown(99) => {}
421            other => panic!("99 should map to Unknown(99), got {:?}", other),
422        }
423    }
424
425    #[test]
426    fn pull_negative_code_maps_to_typed_error() {
427        // Corona convention: negative return values are errors, the absolute
428        // value is the sparkErr* enum code. Verify each mapped variant.
429        let cases = [
430            (-3, SparkPullError::WriteLimit),
431            (-5, SparkPullError::NotAvailable),
432            (-6, SparkPullError::Internal),
433            (-8, SparkPullError::BadKey),
434            (-9, SparkPullError::NoCapability),
435        ];
436        for (host_code, expected) in cases {
437            test_host::reset();
438            test_host::with_mock(|m| m.spark_pull_result = host_code);
439            let err = pull("origin", &["k"]).unwrap_err();
440            assert!(
441                std::mem::discriminant(&err) == std::mem::discriminant(&expected),
442                "host code {} should map to {:?}, got {:?}",
443                host_code,
444                expected,
445                err,
446            );
447        }
448    }
449
450    #[test]
451    fn pull_unknown_negative_code_is_unknown() {
452        test_host::reset();
453        test_host::with_mock(|m| m.spark_pull_result = -42);
454        match pull("origin", &["k"]).unwrap_err() {
455            SparkPullError::Unknown(42) => {}
456            other => panic!("expected Unknown(42), got {:?}", other),
457        }
458    }
459
460    #[test]
461    fn pull_positive_count_is_success() {
462        test_host::reset();
463        test_host::with_mock(|m| m.spark_pull_result = 7);
464        assert_eq!(pull("origin", &["a", "b"]).unwrap(), 7);
465    }
466}