Skip to main content

lance_core/cache/
codec.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Serialization codecs for cache entries.
5//!
6//! Implement [`CacheCodecImpl`] on concrete types, then use
7//! [`CacheCodec::from_impl`] to produce a type-erased codec for the cache.
8//!
9//! # Wire format
10//!
11//! Every serialized entry begins with a small hand-framed **envelope** so the
12//! reader can validate it before trusting the body:
13//!
14//! ```text
15//! [magic: 4B = b"LCE1"]
16//! [envelope_version: u8]
17//! [type_id_len: u16 LE][type_id: utf8]   # stable, author-assigned
18//! [type_version: u32 LE]                 # per-type body schema version
19//! <body, written by the type's CacheCodecImpl::serialize>
20//! ```
21//!
22//! The envelope is deliberately *not* protobuf: it is the most
23//! stability-critical part, must parse robustly against arbitrary bytes
24//! (including data written by older, pre-stabilization builds), and never
25//! changes shape. Bodies use protobuf headers, where field-number evolution
26//! pays off.
27//!
28//! # Decode outcome
29//!
30//! Deserialization never propagates a parse failure as a hard error into the
31//! cache path. Anything the reader cannot confidently interpret — absent or
32//! wrong magic, an unknown `envelope_version`, a `type_id` mismatch, an
33//! unsupported `type_version`, or a body decode error — becomes
34//! [`CacheDecode::Miss`]. A backend turns `Miss` into a normal cache miss and
35//! recomputes the value. This is what lets data written by an older format
36//! self-heal: it simply fails the magic check and is regenerated.
37
38use std::io::Write;
39use std::sync::Arc;
40
41use bytes::Bytes;
42
43use crate::{Error, Result};
44
45use super::{CacheEntryReader, CacheEntryWriter};
46
47// ---------------------------------------------------------------------------
48// Envelope
49// ---------------------------------------------------------------------------
50
51/// Magic bytes that prefix every stabilized cache entry.
52///
53/// An ASCII tag (`0x4C 0x43 0x45 0x31`) chosen so it cannot collide with any
54/// pre-stabilization blob: those began with either a small little-endian
55/// length (tens of bytes) or a small tag byte, never these values.
56///
57/// Exported so backends can cheaply identify Lance cache entries (e.g. when
58/// scanning a persistent store at startup) without hardcoding the bytes —
59/// prefer [`has_cache_envelope`] over comparing against this directly.
60pub const MAGIC: [u8; 4] = *b"LCE1";
61
62/// Returns `true` if `data` begins with the cache-entry [`MAGIC`].
63///
64/// A cheap prefix check for backends that need to recognize Lance cache
65/// entries without fully [`deserialize`](CacheCodec::deserialize)-ing them. A
66/// `true` result only means the framing looks like ours; the entry can still
67/// decode to a [`Miss`](CacheDecode::Miss) (e.g. wrong `type_id`).
68pub fn has_cache_envelope(data: &[u8]) -> bool {
69    data.get(..MAGIC.len()) == Some(&MAGIC[..])
70}
71
72/// Version of the envelope framing itself. Bumped only if the outer frame
73/// (magic/version/type_id/type_version layout) ever changes — expected never.
74const ENVELOPE_VERSION: u8 = 1;
75
76/// Parsed envelope borrowed from the input bytes.
77struct ParsedEnvelope<'a> {
78    type_id: &'a str,
79    type_version: u32,
80    /// Offset of the first body byte within the input.
81    body_offset: usize,
82}
83
84/// Parse and validate the envelope at the start of `data`.
85///
86/// Returns `None` for anything that is not a well-formed envelope this build
87/// understands (wrong/absent magic, unknown `envelope_version`, truncation,
88/// non-utf8 `type_id`). Callers translate `None` into [`CacheDecode::Miss`].
89fn parse_envelope(data: &Bytes) -> Option<ParsedEnvelope<'_>> {
90    let bytes = data.as_ref();
91    let mut off = 0usize;
92
93    let magic = bytes.get(off..off + 4)?;
94    if magic != MAGIC {
95        return None;
96    }
97    off += 4;
98
99    if *bytes.get(off)? != ENVELOPE_VERSION {
100        return None;
101    }
102    off += 1;
103
104    let type_id_len = u16::from_le_bytes(bytes.get(off..off + 2)?.try_into().ok()?) as usize;
105    off += 2;
106
107    let type_id = std::str::from_utf8(bytes.get(off..off + type_id_len)?).ok()?;
108    off += type_id_len;
109
110    let type_version = u32::from_le_bytes(bytes.get(off..off + 4)?.try_into().ok()?);
111    off += 4;
112
113    Some(ParsedEnvelope {
114        type_id,
115        type_version,
116        body_offset: off,
117    })
118}
119
120/// Write the envelope for `type_id`/`type_version`, returning the number of
121/// bytes written (the body's starting offset).
122fn write_envelope(writer: &mut dyn Write, type_id: &str, type_version: u32) -> Result<usize> {
123    let type_id_len = u16::try_from(type_id.len()).map_err(|_| {
124        Error::io(format!(
125            "cache codec type_id too long ({} bytes, max {})",
126            type_id.len(),
127            u16::MAX
128        ))
129    })?;
130
131    writer.write_all(&MAGIC)?;
132    writer.write_all(&[ENVELOPE_VERSION])?;
133    writer.write_all(&type_id_len.to_le_bytes())?;
134    writer.write_all(type_id.as_bytes())?;
135    writer.write_all(&type_version.to_le_bytes())?;
136
137    Ok(4 + 1 + 2 + type_id.len() + 4)
138}
139
140// ---------------------------------------------------------------------------
141// CacheDecode — first-class cache-miss outcome
142// ---------------------------------------------------------------------------
143
144/// Why a cache entry could not be decoded into the expected type.
145///
146/// Carried by [`CacheDecode::Miss`] so backends can emit targeted metrics
147/// (e.g. distinguish "evicting due to a stale format" from "type collision")
148/// without re-parsing. Every reason maps to the same behavior — recompute via
149/// the loader — so callers that don't care can ignore it.
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub enum CacheMissReason {
152    /// Absent or wrong magic, unknown `envelope_version`, truncated framing, or
153    /// a non-utf8 `type_id`. Typically an entry written by a pre-stabilization
154    /// or otherwise foreign build.
155    InvalidEnvelope,
156    /// Well-formed envelope, but its `type_id` names a different entry type than
157    /// the codec reading it.
158    TypeMismatch,
159    /// Written by a newer build whose `type_version` this build does not
160    /// understand and must not attempt to interpret.
161    VersionTooNew,
162    /// Envelope validated, but the body failed to decode (truncation, a
163    /// malformed protobuf header, an IPC error, etc.).
164    BodyError,
165}
166
167/// Outcome of deserializing a cache entry.
168///
169/// `Miss` means the bytes could not be confidently decoded into `T`; the
170/// [`CacheMissReason`] says why. A backend treats any `Miss` exactly like a key
171/// that was never present: recompute via the loader.
172#[derive(Debug)]
173pub enum CacheDecode<T> {
174    Hit(T),
175    Miss(CacheMissReason),
176}
177
178impl<T> CacheDecode<T> {
179    pub fn hit(self) -> Option<T> {
180        match self {
181            Self::Hit(v) => Some(v),
182            Self::Miss(_) => None,
183        }
184    }
185}
186
187// ---------------------------------------------------------------------------
188// CacheCodecImpl — trait for serializable cache entry types
189// ---------------------------------------------------------------------------
190
191/// Serialization trait for cache entries.
192///
193/// **Experimental**: the serialized format is not yet covered by a stability
194/// guarantee and may change between releases. When it does stabilize, the
195/// rules are: `TYPE_ID`, protobuf field numbers, and enum values are
196/// append-only forever; format changes that protobuf cannot express
197/// transparently bump [`CURRENT_VERSION`](Self::CURRENT_VERSION).
198///
199/// Implement this on concrete types that need to survive serialization through
200/// a persistent cache backend, then wire it into a
201/// [`CacheKey`](super::CacheKey) via [`CacheCodec::from_impl`].
202///
203/// The envelope (magic/version/type_id/type_version) is written and validated
204/// by the [`CacheCodec`] wrapper. [`serialize`](Self::serialize) writes only
205/// the body — a header followed by sections in a fixed, version-keyed order —
206/// and [`deserialize`](Self::deserialize) reads them back in that same order.
207/// The read sequence mirroring the write sequence for each `type_version` is
208/// the invariant the implementor owns.
209pub trait CacheCodecImpl: Send + Sync {
210    /// Stable identity for this entry type. **Must not change once shipped.**
211    /// This is a deliberate author-assigned string, not `std::any::type_name`
212    /// (which is not stable across compiler versions).
213    const TYPE_ID: &'static str;
214
215    /// Body schema version this build writes. Bump when the body layout
216    /// changes in a way protobuf field additions cannot express transparently
217    /// (adding/removing/reordering sections, a raw-blob encoding change, etc.).
218    const CURRENT_VERSION: u32;
219
220    /// Write the body: a header, then sections in a fixed order.
221    fn serialize(&self, writer: &mut CacheEntryWriter<'_>) -> Result<()>;
222
223    /// Reconstruct from the body. Branch on
224    /// [`reader.version()`](CacheEntryReader::version) for backward compat;
225    /// sections are read in write order.
226    fn deserialize(reader: &mut CacheEntryReader<'_>) -> Result<Self>
227    where
228        Self: Sized;
229}
230
231// ---------------------------------------------------------------------------
232// CacheCodec — type-erased codec passed to backends
233// ---------------------------------------------------------------------------
234
235pub(crate) type ArcAny = Arc<dyn std::any::Any + Send + Sync>;
236
237/// Type-erased codec for serializing and deserializing cache entries.
238///
239/// `CacheCodec` carries the entry's stable `type_id`/`version` plus two plain
240/// function pointers — it is `Copy` and has no heap allocation. Construct one
241/// via [`CacheCodec::from_impl`] for types that implement [`CacheCodecImpl`],
242/// or [`CacheCodec::new`] for custom cases (e.g. when the orphan rule prevents
243/// a direct impl).
244#[derive(Copy, Clone)]
245pub struct CacheCodec {
246    type_id: &'static str,
247    version: u32,
248    serialize_body: fn(&ArcAny, &mut CacheEntryWriter<'_>) -> Result<()>,
249    deserialize_body: fn(&mut CacheEntryReader<'_>) -> Result<ArcAny>,
250}
251
252impl std::fmt::Debug for CacheCodec {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        f.debug_struct("CacheCodec")
255            .field("type_id", &self.type_id)
256            .field("version", &self.version)
257            .finish_non_exhaustive()
258    }
259}
260
261fn serialize_via_impl<T: CacheCodecImpl + 'static>(
262    any: &ArcAny,
263    writer: &mut CacheEntryWriter<'_>,
264) -> Result<()> {
265    let val = any
266        .downcast_ref::<T>()
267        .expect("CacheCodec::serialize called with wrong type (this is a bug in the cache layer)");
268    val.serialize(writer)
269}
270
271fn deserialize_via_impl<T: CacheCodecImpl + 'static>(
272    reader: &mut CacheEntryReader<'_>,
273) -> Result<ArcAny> {
274    let val = T::deserialize(reader)?;
275    Ok(Arc::new(val) as ArcAny)
276}
277
278impl CacheCodec {
279    /// Create a `CacheCodec` from explicit body function pointers.
280    ///
281    /// Prefer [`from_impl`](Self::from_impl) when the value type implements
282    /// [`CacheCodecImpl`]. Use this for types where a direct impl isn't
283    /// possible (e.g. the orphan rule prevents it). `type_id` and `version`
284    /// play the same role as the corresponding [`CacheCodecImpl`] constants.
285    pub fn new(
286        type_id: &'static str,
287        version: u32,
288        serialize_body: fn(&ArcAny, &mut CacheEntryWriter<'_>) -> Result<()>,
289        deserialize_body: fn(&mut CacheEntryReader<'_>) -> Result<ArcAny>,
290    ) -> Self {
291        Self {
292            type_id,
293            version,
294            serialize_body,
295            deserialize_body,
296        }
297    }
298
299    /// Create a `CacheCodec` from a [`CacheCodecImpl`] implementation.
300    pub fn from_impl<T: CacheCodecImpl + 'static>() -> Self {
301        Self {
302            type_id: T::TYPE_ID,
303            version: T::CURRENT_VERSION,
304            serialize_body: serialize_via_impl::<T>,
305            deserialize_body: deserialize_via_impl::<T>,
306        }
307    }
308
309    /// Serialize `value` into `writer`: envelope first, then the body.
310    pub fn serialize(&self, value: &ArcAny, writer: &mut dyn Write) -> Result<()> {
311        let body_offset = write_envelope(writer, self.type_id, self.version)?;
312        let mut entry_writer = CacheEntryWriter::with_pos(writer, body_offset);
313        (self.serialize_body)(value, &mut entry_writer)
314    }
315
316    /// Deserialize an entry from `data`.
317    ///
318    /// Never fails: any non-fatal failure to interpret the bytes becomes a
319    /// [`CacheDecode::Miss`] with the reason why (see [`CacheMissReason`]).
320    /// Reading from an in-memory [`Bytes`] cannot do I/O, so there is no fault
321    /// channel — a miss is the only non-`Hit` outcome.
322    pub fn deserialize(&self, data: &Bytes) -> CacheDecode<ArcAny> {
323        let Some(envelope) = parse_envelope(data) else {
324            log::debug!("cache entry rejected: missing or invalid envelope");
325            return CacheDecode::Miss(CacheMissReason::InvalidEnvelope);
326        };
327
328        if envelope.type_id != self.type_id {
329            log::debug!(
330                "cache entry type_id mismatch: got {:?}, expected {:?}",
331                envelope.type_id,
332                self.type_id
333            );
334            return CacheDecode::Miss(CacheMissReason::TypeMismatch);
335        }
336
337        // A version newer than this build writes was produced by a newer build
338        // whose body layout we cannot assume to understand. Older/equal versions
339        // are the impl's responsibility to handle (branching on reader.version()).
340        if envelope.type_version > self.version {
341            log::debug!(
342                "cache entry {:?} has unsupported type_version {} (this build writes {})",
343                self.type_id,
344                envelope.type_version,
345                self.version
346            );
347            return CacheDecode::Miss(CacheMissReason::VersionTooNew);
348        }
349
350        let mut reader = CacheEntryReader::new(data, envelope.body_offset, envelope.type_version);
351        match (self.deserialize_body)(&mut reader) {
352            Ok(value) => CacheDecode::Hit(value),
353            Err(e) => {
354                log::debug!(
355                    "cache entry {:?} v{} failed to decode: {e}",
356                    self.type_id,
357                    envelope.type_version
358                );
359                CacheDecode::Miss(CacheMissReason::BodyError)
360            }
361        }
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368
369    /// A trivial codec used to exercise the envelope and miss semantics
370    /// without pulling in arrow-backed payloads.
371    #[derive(Debug, PartialEq)]
372    struct Widget {
373        n: u32,
374    }
375
376    impl CacheCodecImpl for Widget {
377        const TYPE_ID: &'static str = "test.Widget";
378        const CURRENT_VERSION: u32 = 1;
379
380        fn serialize(&self, writer: &mut CacheEntryWriter<'_>) -> Result<()> {
381            writer.write_raw(&self.n.to_le_bytes())
382        }
383
384        fn deserialize(reader: &mut CacheEntryReader<'_>) -> Result<Self> {
385            let bytes = reader.read_raw()?;
386            let n = u32::from_le_bytes(
387                bytes
388                    .as_ref()
389                    .try_into()
390                    .map_err(|_| Error::io("bad widget".to_string()))?,
391            );
392            Ok(Self { n })
393        }
394    }
395
396    fn serialize_widget(widget: &Widget) -> Bytes {
397        let codec = CacheCodec::from_impl::<Widget>();
398        let any: ArcAny = Arc::new(Widget { n: widget.n });
399        let mut buf = Vec::new();
400        codec.serialize(&any, &mut buf).unwrap();
401        Bytes::from(buf)
402    }
403
404    /// The miss reason, or `None` if the decode was a hit.
405    fn miss_reason(data: &Bytes) -> Option<CacheMissReason> {
406        match deserialize_widget(data) {
407            CacheDecode::Hit(_) => None,
408            CacheDecode::Miss(reason) => Some(reason),
409        }
410    }
411
412    fn deserialize_widget(data: &Bytes) -> CacheDecode<Widget> {
413        let codec = CacheCodec::from_impl::<Widget>();
414        match codec.deserialize(data) {
415            CacheDecode::Hit(any) => {
416                CacheDecode::Hit(Arc::try_unwrap(any.downcast::<Widget>().unwrap()).unwrap())
417            }
418            CacheDecode::Miss(reason) => CacheDecode::Miss(reason),
419        }
420    }
421
422    #[test]
423    fn envelope_roundtrip_hits() {
424        let bytes = serialize_widget(&Widget { n: 0xDEADBEEF });
425        // Sanity: the entry starts with the magic.
426        assert_eq!(&bytes[..4], b"LCE1");
427        let decoded = deserialize_widget(&bytes).hit().unwrap();
428        assert_eq!(decoded, Widget { n: 0xDEADBEEF });
429    }
430
431    #[test]
432    fn has_cache_envelope_detects_magic() {
433        let bytes = serialize_widget(&Widget { n: 1 });
434        assert!(has_cache_envelope(&bytes));
435        assert!(has_cache_envelope(&MAGIC)); // exactly the magic, nothing after
436        assert!(!has_cache_envelope(b"LCE")); // too short
437        assert!(!has_cache_envelope(b"JUNK and more"));
438        assert!(!has_cache_envelope(&[]));
439    }
440
441    #[test]
442    fn wrong_magic_is_miss() {
443        let mut bytes = serialize_widget(&Widget { n: 7 }).to_vec();
444        bytes[0] = b'X';
445        assert_eq!(
446            miss_reason(&Bytes::from(bytes)),
447            Some(CacheMissReason::InvalidEnvelope)
448        );
449    }
450
451    #[test]
452    fn pre_stabilization_blob_is_miss() {
453        // An old unstable blob led with a small u64 LE length prefix (a JSON
454        // header of tens of bytes) — no magic. It must self-heal to a miss.
455        let mut blob = Vec::new();
456        blob.extend_from_slice(&(42u64).to_le_bytes());
457        blob.extend_from_slice(&[0u8; 42]);
458        assert_eq!(
459            miss_reason(&Bytes::from(blob)),
460            Some(CacheMissReason::InvalidEnvelope)
461        );
462
463        // A different unstable shape led with a small u8 tag (0/1/2).
464        assert_eq!(
465            miss_reason(&Bytes::from(vec![0u8, 1, 2, 3])),
466            Some(CacheMissReason::InvalidEnvelope)
467        );
468    }
469
470    #[test]
471    fn unknown_envelope_version_is_miss() {
472        let mut bytes = serialize_widget(&Widget { n: 7 }).to_vec();
473        bytes[4] = 0xFF; // envelope_version byte
474        assert_eq!(
475            miss_reason(&Bytes::from(bytes)),
476            Some(CacheMissReason::InvalidEnvelope)
477        );
478    }
479
480    #[test]
481    fn type_id_mismatch_is_miss() {
482        // Hand-build an envelope with a foreign type_id but valid framing.
483        let mut buf = Vec::new();
484        write_envelope(&mut buf, "some.OtherType", 1).unwrap();
485        buf.extend_from_slice(&(4u64).to_le_bytes());
486        buf.extend_from_slice(&99u32.to_le_bytes());
487        assert_eq!(
488            miss_reason(&Bytes::from(buf)),
489            Some(CacheMissReason::TypeMismatch)
490        );
491    }
492
493    #[test]
494    fn unsupported_future_type_version_is_miss() {
495        // An entry written by a newer build (higher type_version) must miss
496        // rather than be misread by this build.
497        let mut buf = Vec::new();
498        write_envelope(&mut buf, Widget::TYPE_ID, Widget::CURRENT_VERSION + 1).unwrap();
499        lance_arrow::ipc::write_len_prefixed_bytes(&mut buf, &9u32.to_le_bytes()).unwrap();
500        assert_eq!(
501            miss_reason(&Bytes::from(buf)),
502            Some(CacheMissReason::VersionTooNew)
503        );
504    }
505
506    #[test]
507    fn truncated_envelope_is_miss() {
508        let bytes = serialize_widget(&Widget { n: 7 });
509        for cut in [0, 1, 4, 5, 7, 9] {
510            assert_eq!(
511                miss_reason(&bytes.slice(..cut.min(bytes.len()))),
512                Some(CacheMissReason::InvalidEnvelope),
513                "truncating to {cut} bytes should miss as InvalidEnvelope"
514            );
515        }
516    }
517
518    #[test]
519    fn body_decode_error_is_miss() {
520        // Valid envelope, but the body is too short for the widget.
521        let mut buf = Vec::new();
522        write_envelope(&mut buf, Widget::TYPE_ID, Widget::CURRENT_VERSION).unwrap();
523        buf.extend_from_slice(&(1u64).to_le_bytes());
524        buf.push(0u8);
525        assert_eq!(
526            miss_reason(&Bytes::from(buf)),
527            Some(CacheMissReason::BodyError)
528        );
529    }
530
531    #[test]
532    fn reader_exposes_envelope_version() {
533        // type_version travels through the envelope to reader.version().
534        let mut buf = Vec::new();
535        write_envelope(&mut buf, Widget::TYPE_ID, 7).unwrap();
536        let body_off = buf.len();
537        // A widget body so the codec can decode it.
538        lance_arrow::ipc::write_len_prefixed_bytes(&mut buf, &5u32.to_le_bytes()).unwrap();
539        let data = Bytes::from(buf);
540
541        let mut r = CacheEntryReader::new(&data, body_off, 7);
542        assert_eq!(r.version(), 7);
543        assert_eq!(r.read_raw().unwrap().as_ref(), 5u32.to_le_bytes());
544    }
545}