1use crate::format::{CacheMeta, LogicalSeq, Page, PageManifest};
11use crate::pager::PageBytes;
12use pf_core::cas::BlobStore;
13use pf_core::digest::Digest256;
14
15pub fn serialize_pages<I>(
20 blobs: &dyn BlobStore,
21 meta: CacheMeta,
22 pages: I,
23 logical_seqs: &[LogicalSeq],
24) -> pf_core::Result<Digest256>
25where
26 I: IntoIterator<Item = (u32, PageBytes)>,
27{
28 let mut manifest = PageManifest::new(meta);
29 for (ix, page) in pages {
30 if page.k.len() != meta.page_bytes() || page.v.len() != meta.page_bytes() {
31 return Err(pf_core::Error::Integrity(format!(
32 "serialize_pages ix={ix}: K/V len {}/{} ≠ expected {}",
33 page.k.len(),
34 page.v.len(),
35 meta.page_bytes()
36 )));
37 }
38 let k = blobs.put(&page.k)?;
39 let v = blobs.put(&page.v)?;
40 manifest.pages.push(Page { ix, k, v });
41 }
42 manifest.logical_seqs = logical_seqs.to_vec();
43 manifest.canonicalize();
44 blobs.put(&serde_json::to_vec(&manifest)?)
45}
46
47pub fn deserialize_pages(
52 blobs: &dyn BlobStore,
53 digest: &Digest256,
54) -> pf_core::Result<DeserializedCache> {
55 let bytes = blobs.get(digest)?;
56 let manifest: PageManifest = serde_json::from_slice(&bytes)?;
57 if manifest.layout != crate::format::LAYOUT_V1 {
58 return Err(pf_core::Error::Integrity(format!(
59 "expected layout {}, got {}",
60 crate::format::LAYOUT_V1,
61 manifest.layout
62 )));
63 }
64 let mut pages = Vec::with_capacity(manifest.pages.len());
65 for p in &manifest.pages {
66 let k = blobs.get(&p.k)?;
67 let v = blobs.get(&p.v)?;
68 if k.len() != manifest.meta.page_bytes() || v.len() != manifest.meta.page_bytes() {
69 return Err(pf_core::Error::Integrity(format!(
70 "deserialize_pages ix={}: K/V len {}/{} ≠ expected {}",
71 p.ix,
72 k.len(),
73 v.len(),
74 manifest.meta.page_bytes()
75 )));
76 }
77 pages.push((p.ix, PageBytes { k, v }));
78 }
79 Ok(DeserializedCache {
80 meta: manifest.meta,
81 pages,
82 logical_seqs: manifest.logical_seqs,
83 })
84}
85
86#[derive(Debug)]
88pub struct DeserializedCache {
89 pub meta: CacheMeta,
91 pub pages: Vec<(u32, PageBytes)>,
93 pub logical_seqs: Vec<LogicalSeq>,
95}
96
97#[cfg(test)]
98mod tests {
99 use super::*;
100 use crate::format::Dtype;
101 use crate::pager::{CachePager, SyntheticCachePager};
102 use pf_core::cas::MemBlobStore;
103
104 fn small_meta() -> CacheMeta {
105 CacheMeta {
106 page_size_tokens: 4,
107 n_layers: 2,
108 n_heads: 2,
109 head_dim: 4,
110 dtype: Dtype::Bf16,
111 }
112 }
113
114 fn dump(pager: &SyntheticCachePager) -> Vec<(u32, PageBytes)> {
115 pager
116 .occupied_pages()
117 .into_iter()
118 .map(|ix| (ix, pager.read_page(ix).unwrap()))
119 .collect()
120 }
121
122 #[test]
123 fn round_trip_byte_identical() {
124 let mut p = SyntheticCachePager::new(small_meta());
125 p.populate_synthetic(8, 42).unwrap();
126 let blobs = MemBlobStore::new();
127 let original = dump(&p);
128 let cid = serialize_pages(
129 &blobs,
130 p.meta(),
131 original.iter().cloned(),
132 &p.logical_seqs(),
133 )
134 .unwrap();
135
136 let back = deserialize_pages(&blobs, &cid).unwrap();
137 assert_eq!(back.meta, p.meta());
138 assert_eq!(back.pages, original);
139 assert_eq!(back.logical_seqs, p.logical_seqs());
140 }
141
142 #[test]
143 fn cow_dedupes_identical_pages_across_two_pagers() {
144 let mut a = SyntheticCachePager::new(small_meta());
145 let mut b = SyntheticCachePager::new(small_meta());
146 a.populate_synthetic(8, 1).unwrap();
147 b.populate_synthetic(8, 1).unwrap(); let blobs = MemBlobStore::new();
150 let _ = serialize_pages(&blobs, a.meta(), dump(&a), &a.logical_seqs()).unwrap();
151 let after_a = blobs.physical_bytes().unwrap();
152 let _ = serialize_pages(&blobs, b.meta(), dump(&b), &b.logical_seqs()).unwrap();
153 let after_b = blobs.physical_bytes().unwrap();
154
155 let growth = after_b - after_a;
158 let allowance = 1024;
159 assert!(
160 growth < allowance,
161 "second pager grew CAS by {growth} B (>{allowance}); CoW failing"
162 );
163 }
164
165 #[test]
166 fn cow_partial_divergence_only_stores_diff() {
167 let meta = small_meta();
170 let mut a = SyntheticCachePager::new(meta);
171 let mut b = SyntheticCachePager::new(meta);
172 a.populate_synthetic(8, 1).unwrap();
173 b.populate_synthetic(8, 1).unwrap();
174
175 let bad = PageBytes {
177 k: vec![0xAA; meta.page_bytes()],
178 v: vec![0xBB; meta.page_bytes()],
179 };
180 b.write_page(3, &bad).unwrap();
181
182 let blobs = MemBlobStore::new();
183 let _ = serialize_pages(&blobs, meta, dump(&a), &a.logical_seqs()).unwrap();
184 let after_a = blobs.physical_bytes().unwrap();
185 let _ = serialize_pages(&blobs, meta, dump(&b), &b.logical_seqs()).unwrap();
186 let after_b = blobs.physical_bytes().unwrap();
187
188 let growth = after_b - after_a;
189 let cap = 2 * (meta.page_bytes() as u64) + 2048;
191 assert!(growth < cap, "growth {growth} > cap {cap}");
192 }
193
194 #[test]
195 fn deserialize_rejects_wrong_layout() {
196 let blobs = MemBlobStore::new();
197 let bogus = serde_json::json!({
198 "layout": "some-other-layout",
199 "page_size_tokens": 4, "n_layers": 2, "n_heads": 2, "head_dim": 4,
200 "dtype": "bf16",
201 "pages": [], "logical_seqs": []
202 });
203 let cid = blobs.put(&serde_json::to_vec(&bogus).unwrap()).unwrap();
204 let err = deserialize_pages(&blobs, &cid).unwrap_err();
205 assert!(matches!(err, pf_core::Error::Integrity(_)));
206 }
207
208 #[test]
209 fn page_canonicalized_order_in_manifest() {
210 let meta = small_meta();
212 let mut pager = SyntheticCachePager::new(meta);
213 pager.populate_synthetic(4, 0).unwrap();
214 let blobs = MemBlobStore::new();
215 let mut reversed = dump(&pager);
217 reversed.reverse();
218 let cid = serialize_pages(&blobs, meta, reversed, &pager.logical_seqs()).unwrap();
219 let back = deserialize_pages(&blobs, &cid).unwrap();
220 let ixs: Vec<u32> = back.pages.iter().map(|(i, _)| *i).collect();
221 assert_eq!(ixs, vec![0, 1, 2, 3]);
222 }
223}