Skip to main content

pf_cache/
serialize.rs

1// SPDX-License-Identifier: MIT
2//! Serialize / deserialize a paged KV cache via a [`BlobStore`].
3//!
4//! Per `agent_docs/cache-layer.md` the K and V buffers of each physical page
5//! are content-addressed *independently* so a fork that mutates only V (e.g.
6//! a one-token decode step) shares its K page with siblings. The
7//! [`pf_core::cas::FsBlobStore`] does the zstd-19 compression at rest, so we
8//! pass raw bytes here.
9
10use crate::format::{CacheMeta, LogicalSeq, Page, PageManifest};
11use crate::pager::PageBytes;
12use pf_core::cas::BlobStore;
13use pf_core::digest::Digest256;
14
15/// Serialize a list of (ix, K-bytes, V-bytes) page tuples + the
16/// per-sequence logical mapping into a [`PageManifest`] and persist every
17/// blob through `blobs`. Returns the digest of the manifest itself, ready
18/// to drop into the `.pfimg` `cache.manifest` field.
19pub fn serialize_pages<I>(
20    blobs: &dyn BlobStore,
21    meta: CacheMeta,
22    pages: I,
23    logical_seqs: &[LogicalSeq],
24) -> pf_core::Result<Digest256>
25where
26    I: IntoIterator<Item = (u32, PageBytes)>,
27{
28    let mut manifest = PageManifest::new(meta);
29    for (ix, page) in pages {
30        if page.k.len() != meta.page_bytes() || page.v.len() != meta.page_bytes() {
31            return Err(pf_core::Error::Integrity(format!(
32                "serialize_pages ix={ix}: K/V len {}/{} ≠ expected {}",
33                page.k.len(),
34                page.v.len(),
35                meta.page_bytes()
36            )));
37        }
38        let k = blobs.put(&page.k)?;
39        let v = blobs.put(&page.v)?;
40        manifest.pages.push(Page { ix, k, v });
41    }
42    manifest.logical_seqs = logical_seqs.to_vec();
43    manifest.canonicalize();
44    blobs.put(&serde_json::to_vec(&manifest)?)
45}
46
47/// Inverse of [`serialize_pages`]. Loads the [`PageManifest`] at `digest`,
48/// fetches every K/V blob, and yields `(ix, PageBytes)` pairs in canonical
49/// order along with the logical-seqs and metadata needed to rebuild the
50/// engine's page table.
51pub fn deserialize_pages(
52    blobs: &dyn BlobStore,
53    digest: &Digest256,
54) -> pf_core::Result<DeserializedCache> {
55    let bytes = blobs.get(digest)?;
56    let manifest: PageManifest = serde_json::from_slice(&bytes)?;
57    if manifest.layout != crate::format::LAYOUT_V1 {
58        return Err(pf_core::Error::Integrity(format!(
59            "expected layout {}, got {}",
60            crate::format::LAYOUT_V1,
61            manifest.layout
62        )));
63    }
64    let mut pages = Vec::with_capacity(manifest.pages.len());
65    for p in &manifest.pages {
66        let k = blobs.get(&p.k)?;
67        let v = blobs.get(&p.v)?;
68        if k.len() != manifest.meta.page_bytes() || v.len() != manifest.meta.page_bytes() {
69            return Err(pf_core::Error::Integrity(format!(
70                "deserialize_pages ix={}: K/V len {}/{} ≠ expected {}",
71                p.ix,
72                k.len(),
73                v.len(),
74                manifest.meta.page_bytes()
75            )));
76        }
77        pages.push((p.ix, PageBytes { k, v }));
78    }
79    Ok(DeserializedCache {
80        meta: manifest.meta,
81        pages,
82        logical_seqs: manifest.logical_seqs,
83    })
84}
85
86/// Output of [`deserialize_pages`].
87#[derive(Debug)]
88pub struct DeserializedCache {
89    /// Static metadata exactly as it was at capture time.
90    pub meta: CacheMeta,
91    /// `(physical_ix, page_bytes)` pairs in canonical (ix-ascending) order.
92    pub pages: Vec<(u32, PageBytes)>,
93    /// Logical sequences in canonical (id-ascending) order.
94    pub logical_seqs: Vec<LogicalSeq>,
95}
96
97#[cfg(test)]
98mod tests {
99    use super::*;
100    use crate::format::Dtype;
101    use crate::pager::{CachePager, SyntheticCachePager};
102    use pf_core::cas::MemBlobStore;
103
104    fn small_meta() -> CacheMeta {
105        CacheMeta {
106            page_size_tokens: 4,
107            n_layers: 2,
108            n_heads: 2,
109            head_dim: 4,
110            dtype: Dtype::Bf16,
111        }
112    }
113
114    fn dump(pager: &SyntheticCachePager) -> Vec<(u32, PageBytes)> {
115        pager
116            .occupied_pages()
117            .into_iter()
118            .map(|ix| (ix, pager.read_page(ix).unwrap()))
119            .collect()
120    }
121
122    #[test]
123    fn round_trip_byte_identical() {
124        let mut p = SyntheticCachePager::new(small_meta());
125        p.populate_synthetic(8, 42).unwrap();
126        let blobs = MemBlobStore::new();
127        let original = dump(&p);
128        let cid = serialize_pages(
129            &blobs,
130            p.meta(),
131            original.iter().cloned(),
132            &p.logical_seqs(),
133        )
134        .unwrap();
135
136        let back = deserialize_pages(&blobs, &cid).unwrap();
137        assert_eq!(back.meta, p.meta());
138        assert_eq!(back.pages, original);
139        assert_eq!(back.logical_seqs, p.logical_seqs());
140    }
141
142    #[test]
143    fn cow_dedupes_identical_pages_across_two_pagers() {
144        let mut a = SyntheticCachePager::new(small_meta());
145        let mut b = SyntheticCachePager::new(small_meta());
146        a.populate_synthetic(8, 1).unwrap();
147        b.populate_synthetic(8, 1).unwrap(); // same seed → identical pages
148
149        let blobs = MemBlobStore::new();
150        let _ = serialize_pages(&blobs, a.meta(), dump(&a), &a.logical_seqs()).unwrap();
151        let after_a = blobs.physical_bytes().unwrap();
152        let _ = serialize_pages(&blobs, b.meta(), dump(&b), &b.logical_seqs()).unwrap();
153        let after_b = blobs.physical_bytes().unwrap();
154
155        // Two pagers with byte-identical pages → only the second manifest
156        // (and the trivially-different logical-seq id) should grow the store.
157        let growth = after_b - after_a;
158        let allowance = 1024;
159        assert!(
160            growth < allowance,
161            "second pager grew CAS by {growth} B (>{allowance}); CoW failing"
162        );
163    }
164
165    #[test]
166    fn cow_partial_divergence_only_stores_diff() {
167        // Two pagers that differ in ONE page should add ~2 page-blobs to CAS
168        // (one new K, one new V), not 8 × 2.
169        let meta = small_meta();
170        let mut a = SyntheticCachePager::new(meta);
171        let mut b = SyntheticCachePager::new(meta);
172        a.populate_synthetic(8, 1).unwrap();
173        b.populate_synthetic(8, 1).unwrap();
174
175        // Mutate page 3 in b only.
176        let bad = PageBytes {
177            k: vec![0xAA; meta.page_bytes()],
178            v: vec![0xBB; meta.page_bytes()],
179        };
180        b.write_page(3, &bad).unwrap();
181
182        let blobs = MemBlobStore::new();
183        let _ = serialize_pages(&blobs, meta, dump(&a), &a.logical_seqs()).unwrap();
184        let after_a = blobs.physical_bytes().unwrap();
185        let _ = serialize_pages(&blobs, meta, dump(&b), &b.logical_seqs()).unwrap();
186        let after_b = blobs.physical_bytes().unwrap();
187
188        let growth = after_b - after_a;
189        // Generous: 2 page blobs (K+V) + manifest JSON.
190        let cap = 2 * (meta.page_bytes() as u64) + 2048;
191        assert!(growth < cap, "growth {growth} > cap {cap}");
192    }
193
194    #[test]
195    fn deserialize_rejects_wrong_layout() {
196        let blobs = MemBlobStore::new();
197        let bogus = serde_json::json!({
198            "layout": "some-other-layout",
199            "page_size_tokens": 4, "n_layers": 2, "n_heads": 2, "head_dim": 4,
200            "dtype": "bf16",
201            "pages": [], "logical_seqs": []
202        });
203        let cid = blobs.put(&serde_json::to_vec(&bogus).unwrap()).unwrap();
204        let err = deserialize_pages(&blobs, &cid).unwrap_err();
205        assert!(matches!(err, pf_core::Error::Integrity(_)));
206    }
207
208    #[test]
209    fn page_canonicalized_order_in_manifest() {
210        // Insertion order shouldn't matter; canonicalize() sorts by ix.
211        let meta = small_meta();
212        let mut pager = SyntheticCachePager::new(meta);
213        pager.populate_synthetic(4, 0).unwrap();
214        let blobs = MemBlobStore::new();
215        // Reverse iteration order on input.
216        let mut reversed = dump(&pager);
217        reversed.reverse();
218        let cid = serialize_pages(&blobs, meta, reversed, &pager.logical_seqs()).unwrap();
219        let back = deserialize_pages(&blobs, &cid).unwrap();
220        let ixs: Vec<u32> = back.pages.iter().map(|(i, _)| *i).collect();
221        assert_eq!(ixs, vec![0, 1, 2, 3]);
222    }
223}