lance_core/cache/codec.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Serialization codecs for cache entries.
5//!
6//! Implement [`CacheCodecImpl`] on concrete types, then use
7//! [`CacheCodec::from_impl`] to produce a type-erased codec for the cache.
8//!
9//! # Wire format
10//!
11//! Every serialized entry begins with a small hand-framed **envelope** so the
12//! reader can validate it before trusting the body:
13//!
14//! ```text
15//! [magic: 4B = b"LCE1"]
16//! [envelope_version: u8]
17//! [type_id_len: u16 LE][type_id: utf8] # stable, author-assigned
18//! [type_version: u32 LE] # per-type body schema version
19//! <body, written by the type's CacheCodecImpl::serialize>
20//! ```
21//!
22//! The envelope is deliberately *not* protobuf: it is the most
23//! stability-critical part, must parse robustly against arbitrary bytes
24//! (including data written by older, pre-stabilization builds), and never
25//! changes shape. Bodies use protobuf headers, where field-number evolution
26//! pays off.
27//!
28//! # Decode outcome
29//!
30//! Deserialization never propagates a parse failure as a hard error into the
31//! cache path. Anything the reader cannot confidently interpret — absent or
32//! wrong magic, an unknown `envelope_version`, a `type_id` mismatch, an
33//! unsupported `type_version`, or a body decode error — becomes
34//! [`CacheDecode::Miss`]. A backend turns `Miss` into a normal cache miss and
35//! recomputes the value. This is what lets data written by an older format
36//! self-heal: it simply fails the magic check and is regenerated.
37
38use std::io::Write;
39use std::sync::Arc;
40
41use bytes::Bytes;
42
43use crate::{Error, Result};
44
45use super::{CacheEntryReader, CacheEntryWriter};
46
47// ---------------------------------------------------------------------------
48// Envelope
49// ---------------------------------------------------------------------------
50
51/// Magic bytes that prefix every stabilized cache entry.
52///
53/// An ASCII tag (`0x4C 0x43 0x45 0x31`) chosen so it cannot collide with any
54/// pre-stabilization blob: those began with either a small little-endian
55/// length (tens of bytes) or a small tag byte, never these values.
56///
57/// Exported so backends can cheaply identify Lance cache entries (e.g. when
58/// scanning a persistent store at startup) without hardcoding the bytes —
59/// prefer [`has_cache_envelope`] over comparing against this directly.
60pub const MAGIC: [u8; 4] = *b"LCE1";
61
62/// Returns `true` if `data` begins with the cache-entry [`MAGIC`].
63///
64/// A cheap prefix check for backends that need to recognize Lance cache
65/// entries without fully [`deserialize`](CacheCodec::deserialize)-ing them. A
66/// `true` result only means the framing looks like ours; the entry can still
67/// decode to a [`Miss`](CacheDecode::Miss) (e.g. wrong `type_id`).
68pub fn has_cache_envelope(data: &[u8]) -> bool {
69 data.get(..MAGIC.len()) == Some(&MAGIC[..])
70}
71
72/// Version of the envelope framing itself. Bumped only if the outer frame
73/// (magic/version/type_id/type_version layout) ever changes — expected never.
74const ENVELOPE_VERSION: u8 = 1;
75
76/// Parsed envelope borrowed from the input bytes.
77struct ParsedEnvelope<'a> {
78 type_id: &'a str,
79 type_version: u32,
80 /// Offset of the first body byte within the input.
81 body_offset: usize,
82}
83
84/// Parse and validate the envelope at the start of `data`.
85///
86/// Returns `None` for anything that is not a well-formed envelope this build
87/// understands (wrong/absent magic, unknown `envelope_version`, truncation,
88/// non-utf8 `type_id`). Callers translate `None` into [`CacheDecode::Miss`].
89fn parse_envelope(data: &Bytes) -> Option<ParsedEnvelope<'_>> {
90 let bytes = data.as_ref();
91 let mut off = 0usize;
92
93 let magic = bytes.get(off..off + 4)?;
94 if magic != MAGIC {
95 return None;
96 }
97 off += 4;
98
99 if *bytes.get(off)? != ENVELOPE_VERSION {
100 return None;
101 }
102 off += 1;
103
104 let type_id_len = u16::from_le_bytes(bytes.get(off..off + 2)?.try_into().ok()?) as usize;
105 off += 2;
106
107 let type_id = std::str::from_utf8(bytes.get(off..off + type_id_len)?).ok()?;
108 off += type_id_len;
109
110 let type_version = u32::from_le_bytes(bytes.get(off..off + 4)?.try_into().ok()?);
111 off += 4;
112
113 Some(ParsedEnvelope {
114 type_id,
115 type_version,
116 body_offset: off,
117 })
118}
119
120/// Write the envelope for `type_id`/`type_version`, returning the number of
121/// bytes written (the body's starting offset).
122fn write_envelope(writer: &mut dyn Write, type_id: &str, type_version: u32) -> Result<usize> {
123 let type_id_len = u16::try_from(type_id.len()).map_err(|_| {
124 Error::io(format!(
125 "cache codec type_id too long ({} bytes, max {})",
126 type_id.len(),
127 u16::MAX
128 ))
129 })?;
130
131 writer.write_all(&MAGIC)?;
132 writer.write_all(&[ENVELOPE_VERSION])?;
133 writer.write_all(&type_id_len.to_le_bytes())?;
134 writer.write_all(type_id.as_bytes())?;
135 writer.write_all(&type_version.to_le_bytes())?;
136
137 Ok(4 + 1 + 2 + type_id.len() + 4)
138}
139
140// ---------------------------------------------------------------------------
141// CacheDecode — first-class cache-miss outcome
142// ---------------------------------------------------------------------------
143
144/// Why a cache entry could not be decoded into the expected type.
145///
146/// Carried by [`CacheDecode::Miss`] so backends can emit targeted metrics
147/// (e.g. distinguish "evicting due to a stale format" from "type collision")
148/// without re-parsing. Every reason maps to the same behavior — recompute via
149/// the loader — so callers that don't care can ignore it.
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub enum CacheMissReason {
152 /// Absent or wrong magic, unknown `envelope_version`, truncated framing, or
153 /// a non-utf8 `type_id`. Typically an entry written by a pre-stabilization
154 /// or otherwise foreign build.
155 InvalidEnvelope,
156 /// Well-formed envelope, but its `type_id` names a different entry type than
157 /// the codec reading it.
158 TypeMismatch,
159 /// Written by a newer build whose `type_version` this build does not
160 /// understand and must not attempt to interpret.
161 VersionTooNew,
162 /// Envelope validated, but the body failed to decode (truncation, a
163 /// malformed protobuf header, an IPC error, etc.).
164 BodyError,
165}
166
167/// Outcome of deserializing a cache entry.
168///
169/// `Miss` means the bytes could not be confidently decoded into `T`; the
170/// [`CacheMissReason`] says why. A backend treats any `Miss` exactly like a key
171/// that was never present: recompute via the loader.
172#[derive(Debug)]
173pub enum CacheDecode<T> {
174 Hit(T),
175 Miss(CacheMissReason),
176}
177
178impl<T> CacheDecode<T> {
179 pub fn hit(self) -> Option<T> {
180 match self {
181 Self::Hit(v) => Some(v),
182 Self::Miss(_) => None,
183 }
184 }
185}
186
187// ---------------------------------------------------------------------------
188// CacheCodecImpl — trait for serializable cache entry types
189// ---------------------------------------------------------------------------
190
191/// Serialization trait for cache entries.
192///
193/// **Experimental**: the serialized format is not yet covered by a stability
194/// guarantee and may change between releases. When it does stabilize, the
195/// rules are: `TYPE_ID`, protobuf field numbers, and enum values are
196/// append-only forever; format changes that protobuf cannot express
197/// transparently bump [`CURRENT_VERSION`](Self::CURRENT_VERSION).
198///
199/// Implement this on concrete types that need to survive serialization through
200/// a persistent cache backend, then wire it into a
201/// [`CacheKey`](super::CacheKey) via [`CacheCodec::from_impl`].
202///
203/// The envelope (magic/version/type_id/type_version) is written and validated
204/// by the [`CacheCodec`] wrapper. [`serialize`](Self::serialize) writes only
205/// the body — a header followed by sections in a fixed, version-keyed order —
206/// and [`deserialize`](Self::deserialize) reads them back in that same order.
207/// The read sequence mirroring the write sequence for each `type_version` is
208/// the invariant the implementor owns.
209pub trait CacheCodecImpl: Send + Sync {
210 /// Stable identity for this entry type. **Must not change once shipped.**
211 /// This is a deliberate author-assigned string, not `std::any::type_name`
212 /// (which is not stable across compiler versions).
213 const TYPE_ID: &'static str;
214
215 /// Body schema version this build writes. Bump when the body layout
216 /// changes in a way protobuf field additions cannot express transparently
217 /// (adding/removing/reordering sections, a raw-blob encoding change, etc.).
218 const CURRENT_VERSION: u32;
219
220 /// Write the body: a header, then sections in a fixed order.
221 fn serialize(&self, writer: &mut CacheEntryWriter<'_>) -> Result<()>;
222
223 /// Reconstruct from the body. Branch on
224 /// [`reader.version()`](CacheEntryReader::version) for backward compat;
225 /// sections are read in write order.
226 fn deserialize(reader: &mut CacheEntryReader<'_>) -> Result<Self>
227 where
228 Self: Sized;
229}
230
231// ---------------------------------------------------------------------------
232// CacheCodec — type-erased codec passed to backends
233// ---------------------------------------------------------------------------
234
235pub(crate) type ArcAny = Arc<dyn std::any::Any + Send + Sync>;
236
237/// Type-erased codec for serializing and deserializing cache entries.
238///
239/// `CacheCodec` carries the entry's stable `type_id`/`version` plus two plain
240/// function pointers — it is `Copy` and has no heap allocation. Construct one
241/// via [`CacheCodec::from_impl`] for types that implement [`CacheCodecImpl`],
242/// or [`CacheCodec::new`] for custom cases (e.g. when the orphan rule prevents
243/// a direct impl).
244#[derive(Copy, Clone)]
245pub struct CacheCodec {
246 type_id: &'static str,
247 version: u32,
248 serialize_body: fn(&ArcAny, &mut CacheEntryWriter<'_>) -> Result<()>,
249 deserialize_body: fn(&mut CacheEntryReader<'_>) -> Result<ArcAny>,
250}
251
252impl std::fmt::Debug for CacheCodec {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 f.debug_struct("CacheCodec")
255 .field("type_id", &self.type_id)
256 .field("version", &self.version)
257 .finish_non_exhaustive()
258 }
259}
260
261fn serialize_via_impl<T: CacheCodecImpl + 'static>(
262 any: &ArcAny,
263 writer: &mut CacheEntryWriter<'_>,
264) -> Result<()> {
265 let val = any
266 .downcast_ref::<T>()
267 .expect("CacheCodec::serialize called with wrong type (this is a bug in the cache layer)");
268 val.serialize(writer)
269}
270
271fn deserialize_via_impl<T: CacheCodecImpl + 'static>(
272 reader: &mut CacheEntryReader<'_>,
273) -> Result<ArcAny> {
274 let val = T::deserialize(reader)?;
275 Ok(Arc::new(val) as ArcAny)
276}
277
278impl CacheCodec {
279 /// Create a `CacheCodec` from explicit body function pointers.
280 ///
281 /// Prefer [`from_impl`](Self::from_impl) when the value type implements
282 /// [`CacheCodecImpl`]. Use this for types where a direct impl isn't
283 /// possible (e.g. the orphan rule prevents it). `type_id` and `version`
284 /// play the same role as the corresponding [`CacheCodecImpl`] constants.
285 pub fn new(
286 type_id: &'static str,
287 version: u32,
288 serialize_body: fn(&ArcAny, &mut CacheEntryWriter<'_>) -> Result<()>,
289 deserialize_body: fn(&mut CacheEntryReader<'_>) -> Result<ArcAny>,
290 ) -> Self {
291 Self {
292 type_id,
293 version,
294 serialize_body,
295 deserialize_body,
296 }
297 }
298
299 /// Create a `CacheCodec` from a [`CacheCodecImpl`] implementation.
300 pub fn from_impl<T: CacheCodecImpl + 'static>() -> Self {
301 Self {
302 type_id: T::TYPE_ID,
303 version: T::CURRENT_VERSION,
304 serialize_body: serialize_via_impl::<T>,
305 deserialize_body: deserialize_via_impl::<T>,
306 }
307 }
308
309 /// Serialize `value` into `writer`: envelope first, then the body.
310 pub fn serialize(&self, value: &ArcAny, writer: &mut dyn Write) -> Result<()> {
311 let body_offset = write_envelope(writer, self.type_id, self.version)?;
312 let mut entry_writer = CacheEntryWriter::with_pos(writer, body_offset);
313 (self.serialize_body)(value, &mut entry_writer)
314 }
315
316 /// Deserialize an entry from `data`.
317 ///
318 /// Never fails: any non-fatal failure to interpret the bytes becomes a
319 /// [`CacheDecode::Miss`] with the reason why (see [`CacheMissReason`]).
320 /// Reading from an in-memory [`Bytes`] cannot do I/O, so there is no fault
321 /// channel — a miss is the only non-`Hit` outcome.
322 pub fn deserialize(&self, data: &Bytes) -> CacheDecode<ArcAny> {
323 let Some(envelope) = parse_envelope(data) else {
324 log::debug!("cache entry rejected: missing or invalid envelope");
325 return CacheDecode::Miss(CacheMissReason::InvalidEnvelope);
326 };
327
328 if envelope.type_id != self.type_id {
329 log::debug!(
330 "cache entry type_id mismatch: got {:?}, expected {:?}",
331 envelope.type_id,
332 self.type_id
333 );
334 return CacheDecode::Miss(CacheMissReason::TypeMismatch);
335 }
336
337 // A version newer than this build writes was produced by a newer build
338 // whose body layout we cannot assume to understand. Older/equal versions
339 // are the impl's responsibility to handle (branching on reader.version()).
340 if envelope.type_version > self.version {
341 log::debug!(
342 "cache entry {:?} has unsupported type_version {} (this build writes {})",
343 self.type_id,
344 envelope.type_version,
345 self.version
346 );
347 return CacheDecode::Miss(CacheMissReason::VersionTooNew);
348 }
349
350 let mut reader = CacheEntryReader::new(data, envelope.body_offset, envelope.type_version);
351 match (self.deserialize_body)(&mut reader) {
352 Ok(value) => CacheDecode::Hit(value),
353 Err(e) => {
354 log::debug!(
355 "cache entry {:?} v{} failed to decode: {e}",
356 self.type_id,
357 envelope.type_version
358 );
359 CacheDecode::Miss(CacheMissReason::BodyError)
360 }
361 }
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368
369 /// A trivial codec used to exercise the envelope and miss semantics
370 /// without pulling in arrow-backed payloads.
371 #[derive(Debug, PartialEq)]
372 struct Widget {
373 n: u32,
374 }
375
376 impl CacheCodecImpl for Widget {
377 const TYPE_ID: &'static str = "test.Widget";
378 const CURRENT_VERSION: u32 = 1;
379
380 fn serialize(&self, writer: &mut CacheEntryWriter<'_>) -> Result<()> {
381 writer.write_raw(&self.n.to_le_bytes())
382 }
383
384 fn deserialize(reader: &mut CacheEntryReader<'_>) -> Result<Self> {
385 let bytes = reader.read_raw()?;
386 let n = u32::from_le_bytes(
387 bytes
388 .as_ref()
389 .try_into()
390 .map_err(|_| Error::io("bad widget".to_string()))?,
391 );
392 Ok(Self { n })
393 }
394 }
395
396 fn serialize_widget(widget: &Widget) -> Bytes {
397 let codec = CacheCodec::from_impl::<Widget>();
398 let any: ArcAny = Arc::new(Widget { n: widget.n });
399 let mut buf = Vec::new();
400 codec.serialize(&any, &mut buf).unwrap();
401 Bytes::from(buf)
402 }
403
404 /// The miss reason, or `None` if the decode was a hit.
405 fn miss_reason(data: &Bytes) -> Option<CacheMissReason> {
406 match deserialize_widget(data) {
407 CacheDecode::Hit(_) => None,
408 CacheDecode::Miss(reason) => Some(reason),
409 }
410 }
411
412 fn deserialize_widget(data: &Bytes) -> CacheDecode<Widget> {
413 let codec = CacheCodec::from_impl::<Widget>();
414 match codec.deserialize(data) {
415 CacheDecode::Hit(any) => {
416 CacheDecode::Hit(Arc::try_unwrap(any.downcast::<Widget>().unwrap()).unwrap())
417 }
418 CacheDecode::Miss(reason) => CacheDecode::Miss(reason),
419 }
420 }
421
422 #[test]
423 fn envelope_roundtrip_hits() {
424 let bytes = serialize_widget(&Widget { n: 0xDEADBEEF });
425 // Sanity: the entry starts with the magic.
426 assert_eq!(&bytes[..4], b"LCE1");
427 let decoded = deserialize_widget(&bytes).hit().unwrap();
428 assert_eq!(decoded, Widget { n: 0xDEADBEEF });
429 }
430
431 #[test]
432 fn has_cache_envelope_detects_magic() {
433 let bytes = serialize_widget(&Widget { n: 1 });
434 assert!(has_cache_envelope(&bytes));
435 assert!(has_cache_envelope(&MAGIC)); // exactly the magic, nothing after
436 assert!(!has_cache_envelope(b"LCE")); // too short
437 assert!(!has_cache_envelope(b"JUNK and more"));
438 assert!(!has_cache_envelope(&[]));
439 }
440
441 #[test]
442 fn wrong_magic_is_miss() {
443 let mut bytes = serialize_widget(&Widget { n: 7 }).to_vec();
444 bytes[0] = b'X';
445 assert_eq!(
446 miss_reason(&Bytes::from(bytes)),
447 Some(CacheMissReason::InvalidEnvelope)
448 );
449 }
450
451 #[test]
452 fn pre_stabilization_blob_is_miss() {
453 // An old unstable blob led with a small u64 LE length prefix (a JSON
454 // header of tens of bytes) — no magic. It must self-heal to a miss.
455 let mut blob = Vec::new();
456 blob.extend_from_slice(&(42u64).to_le_bytes());
457 blob.extend_from_slice(&[0u8; 42]);
458 assert_eq!(
459 miss_reason(&Bytes::from(blob)),
460 Some(CacheMissReason::InvalidEnvelope)
461 );
462
463 // A different unstable shape led with a small u8 tag (0/1/2).
464 assert_eq!(
465 miss_reason(&Bytes::from(vec![0u8, 1, 2, 3])),
466 Some(CacheMissReason::InvalidEnvelope)
467 );
468 }
469
470 #[test]
471 fn unknown_envelope_version_is_miss() {
472 let mut bytes = serialize_widget(&Widget { n: 7 }).to_vec();
473 bytes[4] = 0xFF; // envelope_version byte
474 assert_eq!(
475 miss_reason(&Bytes::from(bytes)),
476 Some(CacheMissReason::InvalidEnvelope)
477 );
478 }
479
480 #[test]
481 fn type_id_mismatch_is_miss() {
482 // Hand-build an envelope with a foreign type_id but valid framing.
483 let mut buf = Vec::new();
484 write_envelope(&mut buf, "some.OtherType", 1).unwrap();
485 buf.extend_from_slice(&(4u64).to_le_bytes());
486 buf.extend_from_slice(&99u32.to_le_bytes());
487 assert_eq!(
488 miss_reason(&Bytes::from(buf)),
489 Some(CacheMissReason::TypeMismatch)
490 );
491 }
492
493 #[test]
494 fn unsupported_future_type_version_is_miss() {
495 // An entry written by a newer build (higher type_version) must miss
496 // rather than be misread by this build.
497 let mut buf = Vec::new();
498 write_envelope(&mut buf, Widget::TYPE_ID, Widget::CURRENT_VERSION + 1).unwrap();
499 lance_arrow::ipc::write_len_prefixed_bytes(&mut buf, &9u32.to_le_bytes()).unwrap();
500 assert_eq!(
501 miss_reason(&Bytes::from(buf)),
502 Some(CacheMissReason::VersionTooNew)
503 );
504 }
505
506 #[test]
507 fn truncated_envelope_is_miss() {
508 let bytes = serialize_widget(&Widget { n: 7 });
509 for cut in [0, 1, 4, 5, 7, 9] {
510 assert_eq!(
511 miss_reason(&bytes.slice(..cut.min(bytes.len()))),
512 Some(CacheMissReason::InvalidEnvelope),
513 "truncating to {cut} bytes should miss as InvalidEnvelope"
514 );
515 }
516 }
517
518 #[test]
519 fn body_decode_error_is_miss() {
520 // Valid envelope, but the body is too short for the widget.
521 let mut buf = Vec::new();
522 write_envelope(&mut buf, Widget::TYPE_ID, Widget::CURRENT_VERSION).unwrap();
523 buf.extend_from_slice(&(1u64).to_le_bytes());
524 buf.push(0u8);
525 assert_eq!(
526 miss_reason(&Bytes::from(buf)),
527 Some(CacheMissReason::BodyError)
528 );
529 }
530
531 #[test]
532 fn reader_exposes_envelope_version() {
533 // type_version travels through the envelope to reader.version().
534 let mut buf = Vec::new();
535 write_envelope(&mut buf, Widget::TYPE_ID, 7).unwrap();
536 let body_off = buf.len();
537 // A widget body so the codec can decode it.
538 lance_arrow::ipc::write_len_prefixed_bytes(&mut buf, &5u32.to_le_bytes()).unwrap();
539 let data = Bytes::from(buf);
540
541 let mut r = CacheEntryReader::new(&data, body_off, 7);
542 assert_eq!(r.version(), 7);
543 assert_eq!(r.read_raw().unwrap().as_ref(), 5u32.to_le_bytes());
544 }
545}