Skip to main content

orbit_rs/cache/
mod.rs

1//! `OrbitCache` — fleet-shared binary K,V cache over the Orbit ring.
2//!
3//! This is the raw primitive layer: keys and values are bytes, the
4//! shared truth is the fleet ring, and higher layers decide how bytes
5//! become application structs or FFI payloads.
6//!
7//! ## V0 shape
8//!
9//! Each mutation is one ring frame:
10//!
11//! ```text
12//! [ op:u8 ][ key_len:u16 LE ][ value_len:u16 LE ][ expires_at_ms:u64 LE ]
13//! [ key bytes ][ value bytes ]
14//! ```
15//!
16//! Reads walk the ring backwards and stop at the newest frame for the
17//! scoped key. A delete frame shadows older puts. A reset frame shadows
18//! older puts/deletes for the cache prefix. An expired put is a miss
19//! and also shadows older puts, matching normal cache TTL semantics.
20//!
21//! ## Why no typed values here
22//!
23//! `orbit-rs` is framework-agnostic. It does not know application
24//! object models, external cache APIs, FFI values, or serde choices. It
25//! only preserves bytes and coarse cache semantics. Typed decode/L1
26//! lives in adapter crates above `orbit-rs`.
27//!
28//! ## Deferred substrate
29//!
30//! V0 stores the value inline in the ring frame, so payloads are small
31//! and bounded by the SHM ring payload size. The eventual larger-value
32//! shape is ring-as-mutation-log plus an indexed SHM arena. The public
33//! byte-oriented API is intended to survive that swap.
34
35use std::sync::Arc;
36use std::time::{Duration, SystemTime, UNIX_EPOCH};
37
38use bytes::{BufMut, Bytes, BytesMut};
39
40use crate::error::{Error, Result};
41use crate::fleet::Fleet;
42use crate::id::NetId64;
43use crate::typed::OrbitTyped;
44
45/// Cache frame payload limit for V0. On Unix this matches the SHM
46/// ring's fixed slot payload size; non-Unix keeps the same contract so
47/// tests and callers do not accidentally rely on unbounded in-memory
48/// frames.
49#[cfg(unix)]
50pub const CACHE_PAYLOAD_MAX: usize = crate::ring::shm::PAYLOAD_MAX;
51#[cfg(not(unix))]
52pub const CACHE_PAYLOAD_MAX: usize = 256;
53
54const HEADER_LEN: usize = 1 + 2 + 2 + 8;
55const OP_PUT: u8 = 1;
56const OP_DELETE: u8 = 2;
57const OP_RESET: u8 = 3;
58
59/// Dedicated ring kind for raw cache mutations.
60#[derive(Clone, Debug)]
61struct OrbitCacheRecord;
62
63impl OrbitTyped for OrbitCacheRecord {
64    // Hand-picked V0 kind. Build-time KIND allocation will replace
65    // these manual values later.
66    const KIND: u8 = 200;
67}
68
69/// Fleet-shared binary cache. Cheap to clone.
70#[derive(Clone)]
71pub struct OrbitCache {
72    fleet: Arc<Fleet>,
73    prefix: Arc<str>,
74}
75
76/// Decoded cache entry returned by [`OrbitCache::get_entry`].
77#[derive(Clone, Debug, PartialEq, Eq)]
78pub struct OrbitCacheEntry {
79    pub value: Bytes,
80    pub epoch: u64,
81    pub expires_at_ms: Option<u64>,
82}
83
84/// Honest read result: either the newest matching frame is a value, or
85/// the key is currently absent. `epoch` is the ring counter of the
86/// newest matching frame when one exists, useful for local L1
87/// invalidation.
88#[derive(Clone, Debug, PartialEq, Eq)]
89pub enum OrbitCacheRead {
90    Hit(OrbitCacheEntry),
91    Miss { epoch: Option<u64> },
92}
93
94impl OrbitCacheRead {
95    pub fn into_value(self) -> Option<Vec<u8>> {
96        match self {
97            Self::Hit(entry) => Some(entry.value.to_vec()),
98            Self::Miss { .. } => None,
99        }
100    }
101
102    pub fn epoch(&self) -> Option<u64> {
103        match self {
104            Self::Hit(entry) => Some(entry.epoch),
105            Self::Miss { epoch } => *epoch,
106        }
107    }
108}
109
110impl OrbitCache {
111    /// Build a cache without a key prefix.
112    pub fn new(fleet: Arc<Fleet>) -> Self {
113        Self::with_prefix(fleet, "")
114    }
115
116    /// Build a cache that prepends `prefix` to every key before it
117    /// hits the wire. Prefixes are byte identity, not display labels;
118    /// use them for namespaces such as `php:apcu:` or `rust:User:`.
119    pub fn with_prefix(fleet: Arc<Fleet>, prefix: impl Into<Arc<str>>) -> Self {
120        Self {
121            fleet,
122            prefix: prefix.into(),
123        }
124    }
125
126    pub fn prefix(&self) -> &str {
127        &self.prefix
128    }
129
130    /// Store `value` under `key`. `ttl = None` means forever until a
131    /// later put/delete shadows it.
132    pub fn put(&self, key: &str, value: &[u8], ttl: Option<Duration>) -> Result<NetId64> {
133        let expires_at_ms = ttl
134            .map(|ttl| now_ms().saturating_add(duration_ms(ttl)))
135            .unwrap_or(0);
136        let payload = encode_frame(
137            OP_PUT,
138            self.scoped_key(key).as_bytes(),
139            value,
140            expires_at_ms,
141        )?;
142        Ok(self
143            .fleet
144            .publish::<OrbitCacheRecord>(OP_PUT, expires_at_ms, payload))
145    }
146
147    /// Delete `key` by publishing a tombstone frame.
148    pub fn delete(&self, key: &str) -> Result<NetId64> {
149        let payload = encode_frame(OP_DELETE, self.scoped_key(key).as_bytes(), &[], 0)?;
150        Ok(self
151            .fleet
152            .publish::<OrbitCacheRecord>(OP_DELETE, 0, payload))
153    }
154
155    /// Reset the current cache prefix by publishing a namespace
156    /// tombstone. Existing frames remain in the ring, but reads treat
157    /// older matching entries as absent.
158    pub fn reset(&self) -> Result<NetId64> {
159        let payload = encode_frame(OP_RESET, self.prefix.as_bytes(), &[], 0)?;
160        Ok(self.fleet.publish::<OrbitCacheRecord>(OP_RESET, 0, payload))
161    }
162
163    /// Read the current value bytes, if present and unexpired.
164    pub fn get(&self, key: &str) -> Option<Vec<u8>> {
165        self.get_entry(key).into_value()
166    }
167
168    /// Read with epoch metadata for adapter-level L1 caches.
169    pub fn get_entry(&self, key: &str) -> OrbitCacheRead {
170        let scoped = self.scoped_key(key);
171        let expected_key = scoped.as_bytes();
172        let head = self.fleet.head::<OrbitCacheRecord>();
173        if head == 0 {
174            return OrbitCacheRead::Miss { epoch: None };
175        }
176
177        let capacity = self.fleet.ring_capacity::<OrbitCacheRecord>() as u64;
178        let walk_count = head.min(capacity);
179        let now = now_ms();
180
181        for i in 0..walk_count {
182            let counter = head - 1 - i;
183            let Some(frame) = self.fleet.read_at::<OrbitCacheRecord>(counter) else {
184                if counter == 0 {
185                    break;
186                }
187                continue;
188            };
189            let Some(decoded) = decode_frame(&frame.payload) else {
190                if counter == 0 {
191                    break;
192                }
193                continue;
194            };
195            if decoded.op == OP_RESET && expected_key.starts_with(decoded.key) {
196                return OrbitCacheRead::Miss {
197                    epoch: Some(frame.id.counter()),
198                };
199            }
200            if decoded.key != expected_key {
201                if counter == 0 {
202                    break;
203                }
204                continue;
205            }
206
207            let epoch = frame.id.counter();
208            return match decoded.op {
209                OP_PUT if decoded.expires_at_ms != 0 && decoded.expires_at_ms <= now => {
210                    OrbitCacheRead::Miss { epoch: Some(epoch) }
211                }
212                OP_PUT => OrbitCacheRead::Hit(OrbitCacheEntry {
213                    value: frame.payload.slice(decoded.value_start..decoded.value_end),
214                    epoch,
215                    expires_at_ms: (decoded.expires_at_ms != 0).then_some(decoded.expires_at_ms),
216                }),
217                OP_DELETE => OrbitCacheRead::Miss { epoch: Some(epoch) },
218                _ => OrbitCacheRead::Miss { epoch: Some(epoch) },
219            };
220        }
221
222        OrbitCacheRead::Miss { epoch: None }
223    }
224
225    fn scoped_key(&self, key: &str) -> String {
226        if self.prefix.is_empty() {
227            key.to_string()
228        } else {
229            format!("{}{}", self.prefix, key)
230        }
231    }
232}
233
234struct DecodedFrame<'a> {
235    op: u8,
236    key: &'a [u8],
237    value_start: usize,
238    value_end: usize,
239    expires_at_ms: u64,
240}
241
242fn encode_frame(op: u8, key: &[u8], value: &[u8], expires_at_ms: u64) -> Result<Bytes> {
243    let total = HEADER_LEN + key.len() + value.len();
244    if key.len() > u16::MAX as usize || value.len() > u16::MAX as usize || total > CACHE_PAYLOAD_MAX
245    {
246        return Err(Error::CacheFrameTooLarge {
247            key_len: key.len(),
248            value_len: value.len(),
249            max_payload: CACHE_PAYLOAD_MAX,
250        });
251    }
252
253    let mut buf = BytesMut::with_capacity(total);
254    buf.put_u8(op);
255    buf.put_u16_le(key.len() as u16);
256    buf.put_u16_le(value.len() as u16);
257    buf.put_u64_le(expires_at_ms);
258    buf.put_slice(key);
259    buf.put_slice(value);
260    Ok(buf.freeze())
261}
262
263fn decode_frame(payload: &Bytes) -> Option<DecodedFrame<'_>> {
264    if payload.len() < HEADER_LEN {
265        return None;
266    }
267
268    let op = payload[0];
269    let key_len = u16::from_le_bytes(payload[1..3].try_into().ok()?) as usize;
270    let value_len = u16::from_le_bytes(payload[3..5].try_into().ok()?) as usize;
271    let expires_at_ms = u64::from_le_bytes(payload[5..13].try_into().ok()?);
272    let key_start = HEADER_LEN;
273    let key_end = key_start.checked_add(key_len)?;
274    let value_end = key_end.checked_add(value_len)?;
275    if payload.len() < value_end {
276        return None;
277    }
278
279    Some(DecodedFrame {
280        op,
281        key: &payload[key_start..key_end],
282        value_start: key_end,
283        value_end,
284        expires_at_ms,
285    })
286}
287
288fn now_ms() -> u64 {
289    SystemTime::now()
290        .duration_since(UNIX_EPOCH)
291        .map(|d| d.as_millis().min(u128::from(u64::MAX)) as u64)
292        .unwrap_or(0)
293}
294
295fn duration_ms(ttl: Duration) -> u64 {
296    ttl.as_millis().max(1).min(u128::from(u64::MAX)) as u64
297}
298
299#[cfg(test)]
300mod tests {
301    use super::OrbitCache;
302    use crate::Fleet;
303
304    #[test]
305    fn reset_shadows_previous_values_inside_prefix() {
306        let fleet = Fleet::join("cache_reset", 1).expect("fleet").into();
307        let cache = OrbitCache::with_prefix(fleet, "php:");
308
309        cache.put("token", b"old", None).expect("put");
310        assert_eq!(cache.get("token"), Some(b"old".to_vec()));
311
312        cache.reset().expect("reset");
313        assert_eq!(cache.get("token"), None);
314
315        cache.put("token", b"new", None).expect("put after reset");
316        assert_eq!(cache.get("token"), Some(b"new".to_vec()));
317    }
318}