Skip to main content

pf_cache/
pager.rs

1// SPDX-License-Identifier: MIT
2//! Engine-agnostic paged-cache interface.
3//!
4//! Every adapter (vLLM, SGLang, …) implements [`CachePager`]. The
5//! [`SyntheticCachePager`] in this module is the in-process implementation
6//! that lets the test suite (and the macOS build host) round-trip the cache
7//! layer end-to-end without booting an inference engine.
8
9use crate::format::{CacheMeta, LogicalSeq};
10use parking_lot::Mutex;
11use std::collections::BTreeMap;
12use std::sync::Arc;
13
14/// One physical (K, V) page as a pair of opaque byte buffers, both of length
15/// `meta.page_bytes()`. We never interpret the bytes — they're whatever the
16/// engine handed us.
17#[derive(Clone, Debug, PartialEq, Eq)]
18pub struct PageBytes {
19    /// Raw K-tensor bytes for this page.
20    pub k: Vec<u8>,
21    /// Raw V-tensor bytes for this page.
22    pub v: Vec<u8>,
23}
24
25/// Engine-agnostic paged-cache interface. Implementations are *not* required
26/// to be thread-safe for `read_page` / `write_page` — the snapshot
27/// orchestrator holds a `&mut self` for the duration of capture/restore.
28pub trait CachePager: Send {
29    /// Static cache metadata (page size, layer count, dtype, …).
30    fn meta(&self) -> CacheMeta;
31
32    /// Pause the engine so the page table is stable. May be a no-op for
33    /// adapters that already hold a write-lock during snapshot.
34    fn pause(&mut self) -> pf_core::Result<()>;
35
36    /// Resume the engine after pause / restore.
37    fn resume(&mut self) -> pf_core::Result<()>;
38
39    /// List physical-page indices that currently hold valid data.
40    fn occupied_pages(&self) -> Vec<u32>;
41
42    /// Snapshot of every live logical sequence's page-table mapping.
43    fn logical_seqs(&self) -> Vec<LogicalSeq>;
44
45    /// Read one (K, V) page out of the engine. Implementations return raw
46    /// bytes of length `meta.page_bytes()` for each.
47    fn read_page(&self, ix: u32) -> pf_core::Result<PageBytes>;
48
49    /// Allocate `n` fresh physical-page slots and return their indices in
50    /// the order they should be filled. Restore writes pages in this order.
51    fn allocate_pages(&mut self, n: usize) -> pf_core::Result<Vec<u32>>;
52
53    /// Write one (K, V) page into the engine at the given physical-page
54    /// index. The index MUST have been returned by a recent
55    /// [`Self::allocate_pages`] call.
56    fn write_page(&mut self, ix: u32, page: &PageBytes) -> pf_core::Result<()>;
57
58    /// Re-install the per-request logical-seq → page-list mapping after
59    /// restore. Called once, after all pages have been written.
60    fn install_logical_seqs(&mut self, seqs: &[LogicalSeq]) -> pf_core::Result<()>;
61}
62
63// ----------------------- SyntheticCachePager -----------------------
64
65/// In-memory [`CachePager`] used by every test in this crate and by
66/// `examples/02-twelve-way-parallel/` (when that lands).
67///
68/// Stores pages in a `BTreeMap<u32, PageBytes>` keyed by physical-page index.
69/// `allocate_pages` returns monotonically increasing indices; `write_page`
70/// inserts; `read_page` reads.
71///
72/// `Clone`-able — the inner state is `Arc<Mutex<…>>` so two clones share the
73/// same logical engine. Useful for "fork the in-memory engine; mutate one
74/// branch; serialize each; assert dedup."
75#[derive(Clone, Debug)]
76pub struct SyntheticCachePager {
77    inner: Arc<Mutex<SynthInner>>,
78    meta: CacheMeta,
79}
80
81#[derive(Debug)]
82struct SynthInner {
83    pages: BTreeMap<u32, PageBytes>,
84    logical_seqs: Vec<LogicalSeq>,
85    next_page_ix: u32,
86    paused: bool,
87}
88
89impl SyntheticCachePager {
90    /// Construct an empty engine with the given static metadata.
91    #[must_use]
92    pub fn new(meta: CacheMeta) -> Self {
93        Self {
94            meta,
95            inner: Arc::new(Mutex::new(SynthInner {
96                pages: BTreeMap::new(),
97                logical_seqs: Vec::new(),
98                next_page_ix: 0,
99                paused: false,
100            })),
101        }
102    }
103
104    /// Pre-populate the engine with `n_pages` deterministic pages and one
105    /// logical sequence covering them all. Useful for tests / benches.
106    ///
107    /// `seed` mixes into the per-page fill so different seeds produce
108    /// different pages (driving CAS divergence) while the same seed produces
109    /// byte-identical pages (driving CAS dedup).
110    pub fn populate_synthetic(&mut self, n_pages: u32, seed: u64) -> pf_core::Result<()> {
111        let mut g = self.inner.lock();
112        let bytes = self.meta.page_bytes();
113        for i in 0..n_pages {
114            let mut k = vec![0u8; bytes];
115            let mut v = vec![0u8; bytes];
116            fill_deterministic(&mut k, seed ^ u64::from(i) ^ 0xCACE_CAFE_CAFE);
117            fill_deterministic(&mut v, seed ^ u64::from(i) ^ 0xC0DE_C0DE_C0DE_C0DE);
118            g.pages.insert(i, PageBytes { k, v });
119        }
120        g.next_page_ix = n_pages;
121        g.logical_seqs.push(LogicalSeq {
122            id: format!("seq-seed-{seed:016x}"),
123            page_ixs: (0..n_pages).collect(),
124            fill_in_last_page: self.meta.page_size_tokens / 2,
125        });
126        Ok(())
127    }
128}
129
130/// SplitMix64 — the same generator we use in `pf-core::fixture`. Cheap,
131/// deterministic, no crate dep.
132fn fill_deterministic(buf: &mut [u8], seed: u64) {
133    let mut s = seed.wrapping_add(0x9E37_79B9_7F4A_7C15);
134    for chunk in buf.chunks_mut(8) {
135        s = s.wrapping_add(0x9E37_79B9_7F4A_7C15);
136        let mut z = s;
137        z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
138        z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
139        z ^= z >> 31;
140        let bytes = z.to_le_bytes();
141        chunk.copy_from_slice(&bytes[..chunk.len()]);
142    }
143}
144
145impl CachePager for SyntheticCachePager {
146    fn meta(&self) -> CacheMeta {
147        self.meta
148    }
149
150    fn pause(&mut self) -> pf_core::Result<()> {
151        self.inner.lock().paused = true;
152        Ok(())
153    }
154
155    fn resume(&mut self) -> pf_core::Result<()> {
156        self.inner.lock().paused = false;
157        Ok(())
158    }
159
160    fn occupied_pages(&self) -> Vec<u32> {
161        self.inner.lock().pages.keys().copied().collect()
162    }
163
164    fn logical_seqs(&self) -> Vec<LogicalSeq> {
165        self.inner.lock().logical_seqs.clone()
166    }
167
168    fn read_page(&self, ix: u32) -> pf_core::Result<PageBytes> {
169        self.inner
170            .lock()
171            .pages
172            .get(&ix)
173            .cloned()
174            .ok_or_else(|| pf_core::Error::Integrity(format!("no page at ix {ix}")))
175    }
176
177    fn allocate_pages(&mut self, n: usize) -> pf_core::Result<Vec<u32>> {
178        let mut g = self.inner.lock();
179        let start = g.next_page_ix;
180        let n_u32 = u32::try_from(n).map_err(|e| {
181            pf_core::Error::Integrity(format!("allocate_pages: {n} too large: {e}"))
182        })?;
183        g.next_page_ix = g.next_page_ix.checked_add(n_u32).ok_or_else(|| {
184            pf_core::Error::Integrity("allocate_pages: page-index overflow".into())
185        })?;
186        Ok((start..start + n_u32).collect())
187    }
188
189    fn write_page(&mut self, ix: u32, page: &PageBytes) -> pf_core::Result<()> {
190        let expected = self.meta.page_bytes();
191        if page.k.len() != expected || page.v.len() != expected {
192            return Err(pf_core::Error::Integrity(format!(
193                "write_page ix={ix}: K/V len {}/{} ≠ expected {expected}",
194                page.k.len(),
195                page.v.len()
196            )));
197        }
198        self.inner.lock().pages.insert(ix, page.clone());
199        Ok(())
200    }
201
202    fn install_logical_seqs(&mut self, seqs: &[LogicalSeq]) -> pf_core::Result<()> {
203        self.inner.lock().logical_seqs = seqs.to_vec();
204        Ok(())
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use crate::format::Dtype;
212
213    fn small_meta() -> CacheMeta {
214        // Tiny by design — keeps tests fast.
215        CacheMeta {
216            page_size_tokens: 4,
217            n_layers: 2,
218            n_heads: 2,
219            head_dim: 4,
220            dtype: Dtype::Bf16,
221        }
222    }
223
224    #[test]
225    fn populate_then_read_round_trips_bytes() {
226        let mut p = SyntheticCachePager::new(small_meta());
227        p.populate_synthetic(8, 42).unwrap();
228        let occ = p.occupied_pages();
229        assert_eq!(occ, (0..8).collect::<Vec<_>>());
230        let page0 = p.read_page(0).unwrap();
231        let page0_again = p.read_page(0).unwrap();
232        assert_eq!(page0, page0_again, "read_page deterministic");
233    }
234
235    #[test]
236    fn same_seed_produces_byte_identical_pages() {
237        let mut a = SyntheticCachePager::new(small_meta());
238        let mut b = SyntheticCachePager::new(small_meta());
239        a.populate_synthetic(4, 7).unwrap();
240        b.populate_synthetic(4, 7).unwrap();
241        for ix in 0..4 {
242            assert_eq!(a.read_page(ix).unwrap(), b.read_page(ix).unwrap());
243        }
244    }
245
246    #[test]
247    fn different_seed_diverges() {
248        let mut a = SyntheticCachePager::new(small_meta());
249        let mut b = SyntheticCachePager::new(small_meta());
250        a.populate_synthetic(4, 7).unwrap();
251        b.populate_synthetic(4, 9).unwrap();
252        assert_ne!(a.read_page(0).unwrap(), b.read_page(0).unwrap());
253    }
254
255    #[test]
256    fn write_page_validates_length() {
257        let mut p = SyntheticCachePager::new(small_meta());
258        let ixs = p.allocate_pages(1).unwrap();
259        let bad = PageBytes {
260            k: vec![0u8; 1],
261            v: vec![0u8; 1],
262        };
263        assert!(p.write_page(ixs[0], &bad).is_err());
264    }
265
266    #[test]
267    fn allocate_pages_returns_monotonic_indices() {
268        let mut p = SyntheticCachePager::new(small_meta());
269        let a = p.allocate_pages(3).unwrap();
270        let b = p.allocate_pages(2).unwrap();
271        assert_eq!(a, vec![0, 1, 2]);
272        assert_eq!(b, vec![3, 4]);
273    }
274}