1use crate::format::{CacheMeta, LogicalSeq};
10use parking_lot::Mutex;
11use std::collections::BTreeMap;
12use std::sync::Arc;
13
14#[derive(Clone, Debug, PartialEq, Eq)]
18pub struct PageBytes {
19 pub k: Vec<u8>,
21 pub v: Vec<u8>,
23}
24
25pub trait CachePager: Send {
29 fn meta(&self) -> CacheMeta;
31
32 fn pause(&mut self) -> pf_core::Result<()>;
35
36 fn resume(&mut self) -> pf_core::Result<()>;
38
39 fn occupied_pages(&self) -> Vec<u32>;
41
42 fn logical_seqs(&self) -> Vec<LogicalSeq>;
44
45 fn read_page(&self, ix: u32) -> pf_core::Result<PageBytes>;
48
49 fn allocate_pages(&mut self, n: usize) -> pf_core::Result<Vec<u32>>;
52
53 fn write_page(&mut self, ix: u32, page: &PageBytes) -> pf_core::Result<()>;
57
58 fn install_logical_seqs(&mut self, seqs: &[LogicalSeq]) -> pf_core::Result<()>;
61}
62
63#[derive(Clone, Debug)]
76pub struct SyntheticCachePager {
77 inner: Arc<Mutex<SynthInner>>,
78 meta: CacheMeta,
79}
80
81#[derive(Debug)]
82struct SynthInner {
83 pages: BTreeMap<u32, PageBytes>,
84 logical_seqs: Vec<LogicalSeq>,
85 next_page_ix: u32,
86 paused: bool,
87}
88
89impl SyntheticCachePager {
90 #[must_use]
92 pub fn new(meta: CacheMeta) -> Self {
93 Self {
94 meta,
95 inner: Arc::new(Mutex::new(SynthInner {
96 pages: BTreeMap::new(),
97 logical_seqs: Vec::new(),
98 next_page_ix: 0,
99 paused: false,
100 })),
101 }
102 }
103
104 pub fn populate_synthetic(&mut self, n_pages: u32, seed: u64) -> pf_core::Result<()> {
111 let mut g = self.inner.lock();
112 let bytes = self.meta.page_bytes();
113 for i in 0..n_pages {
114 let mut k = vec![0u8; bytes];
115 let mut v = vec![0u8; bytes];
116 fill_deterministic(&mut k, seed ^ u64::from(i) ^ 0xCACE_CAFE_CAFE);
117 fill_deterministic(&mut v, seed ^ u64::from(i) ^ 0xC0DE_C0DE_C0DE_C0DE);
118 g.pages.insert(i, PageBytes { k, v });
119 }
120 g.next_page_ix = n_pages;
121 g.logical_seqs.push(LogicalSeq {
122 id: format!("seq-seed-{seed:016x}"),
123 page_ixs: (0..n_pages).collect(),
124 fill_in_last_page: self.meta.page_size_tokens / 2,
125 });
126 Ok(())
127 }
128}
129
130fn fill_deterministic(buf: &mut [u8], seed: u64) {
133 let mut s = seed.wrapping_add(0x9E37_79B9_7F4A_7C15);
134 for chunk in buf.chunks_mut(8) {
135 s = s.wrapping_add(0x9E37_79B9_7F4A_7C15);
136 let mut z = s;
137 z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
138 z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
139 z ^= z >> 31;
140 let bytes = z.to_le_bytes();
141 chunk.copy_from_slice(&bytes[..chunk.len()]);
142 }
143}
144
145impl CachePager for SyntheticCachePager {
146 fn meta(&self) -> CacheMeta {
147 self.meta
148 }
149
150 fn pause(&mut self) -> pf_core::Result<()> {
151 self.inner.lock().paused = true;
152 Ok(())
153 }
154
155 fn resume(&mut self) -> pf_core::Result<()> {
156 self.inner.lock().paused = false;
157 Ok(())
158 }
159
160 fn occupied_pages(&self) -> Vec<u32> {
161 self.inner.lock().pages.keys().copied().collect()
162 }
163
164 fn logical_seqs(&self) -> Vec<LogicalSeq> {
165 self.inner.lock().logical_seqs.clone()
166 }
167
168 fn read_page(&self, ix: u32) -> pf_core::Result<PageBytes> {
169 self.inner
170 .lock()
171 .pages
172 .get(&ix)
173 .cloned()
174 .ok_or_else(|| pf_core::Error::Integrity(format!("no page at ix {ix}")))
175 }
176
177 fn allocate_pages(&mut self, n: usize) -> pf_core::Result<Vec<u32>> {
178 let mut g = self.inner.lock();
179 let start = g.next_page_ix;
180 let n_u32 = u32::try_from(n).map_err(|e| {
181 pf_core::Error::Integrity(format!("allocate_pages: {n} too large: {e}"))
182 })?;
183 g.next_page_ix = g.next_page_ix.checked_add(n_u32).ok_or_else(|| {
184 pf_core::Error::Integrity("allocate_pages: page-index overflow".into())
185 })?;
186 Ok((start..start + n_u32).collect())
187 }
188
189 fn write_page(&mut self, ix: u32, page: &PageBytes) -> pf_core::Result<()> {
190 let expected = self.meta.page_bytes();
191 if page.k.len() != expected || page.v.len() != expected {
192 return Err(pf_core::Error::Integrity(format!(
193 "write_page ix={ix}: K/V len {}/{} ≠ expected {expected}",
194 page.k.len(),
195 page.v.len()
196 )));
197 }
198 self.inner.lock().pages.insert(ix, page.clone());
199 Ok(())
200 }
201
202 fn install_logical_seqs(&mut self, seqs: &[LogicalSeq]) -> pf_core::Result<()> {
203 self.inner.lock().logical_seqs = seqs.to_vec();
204 Ok(())
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use crate::format::Dtype;
212
213 fn small_meta() -> CacheMeta {
214 CacheMeta {
216 page_size_tokens: 4,
217 n_layers: 2,
218 n_heads: 2,
219 head_dim: 4,
220 dtype: Dtype::Bf16,
221 }
222 }
223
224 #[test]
225 fn populate_then_read_round_trips_bytes() {
226 let mut p = SyntheticCachePager::new(small_meta());
227 p.populate_synthetic(8, 42).unwrap();
228 let occ = p.occupied_pages();
229 assert_eq!(occ, (0..8).collect::<Vec<_>>());
230 let page0 = p.read_page(0).unwrap();
231 let page0_again = p.read_page(0).unwrap();
232 assert_eq!(page0, page0_again, "read_page deterministic");
233 }
234
235 #[test]
236 fn same_seed_produces_byte_identical_pages() {
237 let mut a = SyntheticCachePager::new(small_meta());
238 let mut b = SyntheticCachePager::new(small_meta());
239 a.populate_synthetic(4, 7).unwrap();
240 b.populate_synthetic(4, 7).unwrap();
241 for ix in 0..4 {
242 assert_eq!(a.read_page(ix).unwrap(), b.read_page(ix).unwrap());
243 }
244 }
245
246 #[test]
247 fn different_seed_diverges() {
248 let mut a = SyntheticCachePager::new(small_meta());
249 let mut b = SyntheticCachePager::new(small_meta());
250 a.populate_synthetic(4, 7).unwrap();
251 b.populate_synthetic(4, 9).unwrap();
252 assert_ne!(a.read_page(0).unwrap(), b.read_page(0).unwrap());
253 }
254
255 #[test]
256 fn write_page_validates_length() {
257 let mut p = SyntheticCachePager::new(small_meta());
258 let ixs = p.allocate_pages(1).unwrap();
259 let bad = PageBytes {
260 k: vec![0u8; 1],
261 v: vec![0u8; 1],
262 };
263 assert!(p.write_page(ixs[0], &bad).is_err());
264 }
265
266 #[test]
267 fn allocate_pages_returns_monotonic_indices() {
268 let mut p = SyntheticCachePager::new(small_meta());
269 let a = p.allocate_pages(3).unwrap();
270 let b = p.allocate_pages(2).unwrap();
271 assert_eq!(a, vec![0, 1, 2]);
272 assert_eq!(b, vec![3, 4]);
273 }
274}