1use std::sync::Arc;
36use std::time::{Duration, SystemTime, UNIX_EPOCH};
37
38use bytes::{BufMut, Bytes, BytesMut};
39
40use crate::error::{Error, Result};
41use crate::fleet::Fleet;
42use crate::id::NetId64;
43use crate::typed::OrbitTyped;
44
45#[cfg(unix)]
50pub const CACHE_PAYLOAD_MAX: usize = crate::ring::shm::PAYLOAD_MAX;
51#[cfg(not(unix))]
52pub const CACHE_PAYLOAD_MAX: usize = 256;
53
54const HEADER_LEN: usize = 1 + 2 + 2 + 8;
55const OP_PUT: u8 = 1;
56const OP_DELETE: u8 = 2;
57const OP_RESET: u8 = 3;
58
59#[derive(Clone, Debug)]
61struct OrbitCacheRecord;
62
63impl OrbitTyped for OrbitCacheRecord {
64 const KIND: u8 = 200;
67}
68
69#[derive(Clone)]
71pub struct OrbitCache {
72 fleet: Arc<Fleet>,
73 prefix: Arc<str>,
74}
75
76#[derive(Clone, Debug, PartialEq, Eq)]
78pub struct OrbitCacheEntry {
79 pub value: Bytes,
80 pub epoch: u64,
81 pub expires_at_ms: Option<u64>,
82}
83
84#[derive(Clone, Debug, PartialEq, Eq)]
89pub enum OrbitCacheRead {
90 Hit(OrbitCacheEntry),
91 Miss { epoch: Option<u64> },
92}
93
94impl OrbitCacheRead {
95 pub fn into_value(self) -> Option<Vec<u8>> {
96 match self {
97 Self::Hit(entry) => Some(entry.value.to_vec()),
98 Self::Miss { .. } => None,
99 }
100 }
101
102 pub fn epoch(&self) -> Option<u64> {
103 match self {
104 Self::Hit(entry) => Some(entry.epoch),
105 Self::Miss { epoch } => *epoch,
106 }
107 }
108}
109
110impl OrbitCache {
111 pub fn new(fleet: Arc<Fleet>) -> Self {
113 Self::with_prefix(fleet, "")
114 }
115
116 pub fn with_prefix(fleet: Arc<Fleet>, prefix: impl Into<Arc<str>>) -> Self {
120 Self {
121 fleet,
122 prefix: prefix.into(),
123 }
124 }
125
126 pub fn prefix(&self) -> &str {
127 &self.prefix
128 }
129
130 pub fn put(&self, key: &str, value: &[u8], ttl: Option<Duration>) -> Result<NetId64> {
133 let expires_at_ms = ttl
134 .map(|ttl| now_ms().saturating_add(duration_ms(ttl)))
135 .unwrap_or(0);
136 let payload = encode_frame(
137 OP_PUT,
138 self.scoped_key(key).as_bytes(),
139 value,
140 expires_at_ms,
141 )?;
142 Ok(self
143 .fleet
144 .publish::<OrbitCacheRecord>(OP_PUT, expires_at_ms, payload))
145 }
146
147 pub fn delete(&self, key: &str) -> Result<NetId64> {
149 let payload = encode_frame(OP_DELETE, self.scoped_key(key).as_bytes(), &[], 0)?;
150 Ok(self
151 .fleet
152 .publish::<OrbitCacheRecord>(OP_DELETE, 0, payload))
153 }
154
155 pub fn reset(&self) -> Result<NetId64> {
159 let payload = encode_frame(OP_RESET, self.prefix.as_bytes(), &[], 0)?;
160 Ok(self.fleet.publish::<OrbitCacheRecord>(OP_RESET, 0, payload))
161 }
162
163 pub fn get(&self, key: &str) -> Option<Vec<u8>> {
165 self.get_entry(key).into_value()
166 }
167
168 pub fn get_entry(&self, key: &str) -> OrbitCacheRead {
170 let scoped = self.scoped_key(key);
171 let expected_key = scoped.as_bytes();
172 let head = self.fleet.head::<OrbitCacheRecord>();
173 if head == 0 {
174 return OrbitCacheRead::Miss { epoch: None };
175 }
176
177 let capacity = self.fleet.ring_capacity::<OrbitCacheRecord>() as u64;
178 let walk_count = head.min(capacity);
179 let now = now_ms();
180
181 for i in 0..walk_count {
182 let counter = head - 1 - i;
183 let Some(frame) = self.fleet.read_at::<OrbitCacheRecord>(counter) else {
184 if counter == 0 {
185 break;
186 }
187 continue;
188 };
189 let Some(decoded) = decode_frame(&frame.payload) else {
190 if counter == 0 {
191 break;
192 }
193 continue;
194 };
195 if decoded.op == OP_RESET && expected_key.starts_with(decoded.key) {
196 return OrbitCacheRead::Miss {
197 epoch: Some(frame.id.counter()),
198 };
199 }
200 if decoded.key != expected_key {
201 if counter == 0 {
202 break;
203 }
204 continue;
205 }
206
207 let epoch = frame.id.counter();
208 return match decoded.op {
209 OP_PUT if decoded.expires_at_ms != 0 && decoded.expires_at_ms <= now => {
210 OrbitCacheRead::Miss { epoch: Some(epoch) }
211 }
212 OP_PUT => OrbitCacheRead::Hit(OrbitCacheEntry {
213 value: frame.payload.slice(decoded.value_start..decoded.value_end),
214 epoch,
215 expires_at_ms: (decoded.expires_at_ms != 0).then_some(decoded.expires_at_ms),
216 }),
217 OP_DELETE => OrbitCacheRead::Miss { epoch: Some(epoch) },
218 _ => OrbitCacheRead::Miss { epoch: Some(epoch) },
219 };
220 }
221
222 OrbitCacheRead::Miss { epoch: None }
223 }
224
225 fn scoped_key(&self, key: &str) -> String {
226 if self.prefix.is_empty() {
227 key.to_string()
228 } else {
229 format!("{}{}", self.prefix, key)
230 }
231 }
232}
233
234struct DecodedFrame<'a> {
235 op: u8,
236 key: &'a [u8],
237 value_start: usize,
238 value_end: usize,
239 expires_at_ms: u64,
240}
241
242fn encode_frame(op: u8, key: &[u8], value: &[u8], expires_at_ms: u64) -> Result<Bytes> {
243 let total = HEADER_LEN + key.len() + value.len();
244 if key.len() > u16::MAX as usize || value.len() > u16::MAX as usize || total > CACHE_PAYLOAD_MAX
245 {
246 return Err(Error::CacheFrameTooLarge {
247 key_len: key.len(),
248 value_len: value.len(),
249 max_payload: CACHE_PAYLOAD_MAX,
250 });
251 }
252
253 let mut buf = BytesMut::with_capacity(total);
254 buf.put_u8(op);
255 buf.put_u16_le(key.len() as u16);
256 buf.put_u16_le(value.len() as u16);
257 buf.put_u64_le(expires_at_ms);
258 buf.put_slice(key);
259 buf.put_slice(value);
260 Ok(buf.freeze())
261}
262
263fn decode_frame(payload: &Bytes) -> Option<DecodedFrame<'_>> {
264 if payload.len() < HEADER_LEN {
265 return None;
266 }
267
268 let op = payload[0];
269 let key_len = u16::from_le_bytes(payload[1..3].try_into().ok()?) as usize;
270 let value_len = u16::from_le_bytes(payload[3..5].try_into().ok()?) as usize;
271 let expires_at_ms = u64::from_le_bytes(payload[5..13].try_into().ok()?);
272 let key_start = HEADER_LEN;
273 let key_end = key_start.checked_add(key_len)?;
274 let value_end = key_end.checked_add(value_len)?;
275 if payload.len() < value_end {
276 return None;
277 }
278
279 Some(DecodedFrame {
280 op,
281 key: &payload[key_start..key_end],
282 value_start: key_end,
283 value_end,
284 expires_at_ms,
285 })
286}
287
288fn now_ms() -> u64 {
289 SystemTime::now()
290 .duration_since(UNIX_EPOCH)
291 .map(|d| d.as_millis().min(u128::from(u64::MAX)) as u64)
292 .unwrap_or(0)
293}
294
295fn duration_ms(ttl: Duration) -> u64 {
296 ttl.as_millis().max(1).min(u128::from(u64::MAX)) as u64
297}
298
299#[cfg(test)]
300mod tests {
301 use super::OrbitCache;
302 use crate::Fleet;
303
304 #[test]
305 fn reset_shadows_previous_values_inside_prefix() {
306 let fleet = Fleet::join("cache_reset", 1).expect("fleet").into();
307 let cache = OrbitCache::with_prefix(fleet, "php:");
308
309 cache.put("token", b"old", None).expect("put");
310 assert_eq!(cache.get("token"), Some(b"old".to_vec()));
311
312 cache.reset().expect("reset");
313 assert_eq!(cache.get("token"), None);
314
315 cache.put("token", b"new", None).expect("put after reset");
316 assert_eq!(cache.get("token"), Some(b"new".to_vec()));
317 }
318}