1use crate::{Data, Limits, PageStorage, PageStorageInfo, Storage, dividedstg, util};
2use dividedstg::{DividedStg, FD, FD_SIZE};
3use std::collections::BTreeSet;
4use std::sync::Arc;
5
6pub struct BlockPageStg {
16 pub ds: DividedStg,
18 alloc_pn: u64,
19 first_free_pn: u64,
20 fd: Vec<FD>,
21 free_pn: BTreeSet<u64>, header_dirty: bool,
23 is_new: bool,
24 psi: SizeInfo,
25 header_size: u64,
26 zbytes: Data,
27}
28
29const PN_FILE: usize = 0; const NOT_PN: u64 = u64::MAX >> 16; const PAGE_HSIZE: usize = 8; const HEADER_SIZE: usize = 24; impl BlockPageStg {
35 pub fn new(stg: Box<dyn Storage>, lim: &Limits) -> Box<Self> {
37 let is_new = stg.size() == 0;
38
39 let sizes = lim.page_sizes;
40 let max_div = lim.max_div;
41 let ds = DividedStg::new(stg, lim.blk_cap);
42 let blk_cap = ds.blk_cap as usize;
43
44 let mut s = Self {
45 ds,
46 alloc_pn: 0,
47 first_free_pn: NOT_PN,
48 fd: Vec::new(),
49 free_pn: BTreeSet::default(),
50 header_dirty: true,
51 is_new,
52 psi: SizeInfo {
53 blk_cap,
54 max_div,
55 sizes,
56 },
57 header_size: 0,
58 zbytes: Data::default(),
59 };
60
61 if is_new {
62 for _i in 0..sizes + 1 {
63 s.fd.push(s.ds.new_file());
64 }
65 s.ds.set_root(&s.fd[0]);
66 } else {
67 s.read_header();
68 }
69
70 assert!(
72 s.psi.max_size_page() <= u16::MAX as usize,
73 "Max page size is 65535"
74 );
75
76 s.header_size = (HEADER_SIZE + s.psi.sizes * FD_SIZE) as u64;
77 s.zbytes = Arc::new(vec![0; s.psi.max_size_page()]);
78
79 #[cfg(feature = "log")]
80 println!("bps new alloc={:?}", &s.allocs());
81
82 Box::new(s)
83 }
84
85 fn read_header(&mut self) {
87 self.fd.clear();
88 self.fd.push(self.ds.get_root());
89
90 let mut buf = [0; HEADER_SIZE];
91 self.read(PN_FILE, 0, &mut buf);
92 self.alloc_pn = util::getu64(&buf, 0);
93 self.first_free_pn = util::getu64(&buf, 8);
94
95 self.psi.max_div = util::get(&buf, 16, 4) as usize;
96 self.psi.sizes = util::get(&buf, 20, 4) as usize;
97
98 let sizes = self.psi.sizes;
99 let mut buf = vec![0; FD_SIZE * sizes];
100 self.read(PN_FILE, HEADER_SIZE as u64, &mut buf);
101
102 for fx in 0..sizes {
103 let off = fx * FD_SIZE;
104 self.fd.push(self.ds.load_fd(&buf[off..]));
105 }
106 self.header_dirty = false;
107 }
108
109 fn write_header(&mut self) {
111 let mut buf = vec![0; self.header_size as usize];
112 util::setu64(&mut buf, self.alloc_pn);
113 util::setu64(&mut buf[8..], self.first_free_pn);
114 util::set(&mut buf, 16, self.psi.max_div as u64, 4);
115 util::set(&mut buf, 20, self.psi.sizes as u64, 4);
116
117 for fx in 0..self.psi.sizes {
118 let off = HEADER_SIZE + fx * FD_SIZE;
119 self.ds.save_fd(&self.fd[fx + 1], &mut buf[off..]);
120 }
121 self.write(PN_FILE, 0, &buf);
122 self.header_dirty = false;
123
124 #[cfg(feature = "log")]
125 println!("bps write_header allocs={:?}", &self.allocs());
126 }
127
128 #[cfg(feature = "log")]
129 fn allocs(&self) -> Vec<u64> {
130 (0..self.psi.sizes() + 1).map(|x| self.alloc(x)).collect()
131 }
132
133 fn page_size(&self, fx: usize) -> u64 {
135 (self.psi.size(fx) + PAGE_HSIZE) as u64
136 }
137
138 fn fsize(&self, fx: usize) -> u64 {
140 let size = self.fd[fx].size();
141 if fx == 0 && size < self.header_size {
142 self.header_size
143 } else {
144 size
145 }
146 }
147
148 fn alloc(&self, fx: usize) -> u64 {
150 let size = self.fsize(fx);
151 if fx == 0 {
152 (size - self.header_size) / 8
153 } else {
154 let ps = self.page_size(fx);
155 size.div_ceil(ps)
156 }
157 }
158
159 fn free_page(&mut self, fx: usize, ix: u64) {
161 if fx != 0 {
162 let last = self.alloc(fx) - 1;
163 let ps = self.page_size(fx);
164 if last != ix {
165 let mut buf = vec![0; ps as usize];
166 self.read(fx, last * ps, &mut buf);
167 let pn = util::getu64(&buf, 0);
168 let (fx1, _size, ix1) = self.get_pn_info(pn);
169 assert!(fx1 == fx && ix1 == last);
170 self.update_ix(pn, ix);
171 self.write_data(fx, ix * ps, Arc::new(buf));
172 }
173 self.truncate(fx, last * ps);
174 }
175 }
176
177 fn set_pn_info(&mut self, pn: u64, size: usize, ix: u64) {
179 let off = self.header_size + pn * 8;
180 let eof = self.fsize(PN_FILE);
181 if off > eof {
182 self.clear(PN_FILE, eof, off - eof);
183 }
184 let mut buf = [0; 8];
185 util::set(&mut buf, 0, ix, 6);
186 util::set(&mut buf, 6, size as u64, 2);
187 self.write(PN_FILE, off, &buf);
188 }
189
190 fn get_pn_info(&self, pn: u64) -> (usize, usize, u64) {
192 let off = self.header_size + pn * 8;
193 if off >= self.fsize(0) {
194 return (0, 0, 0);
195 }
196 let mut buf = [0; 8];
197 self.read(PN_FILE, off, &mut buf);
198 let ix = util::get(&buf, 0, 6);
199 let size = util::get(&buf, 6, 2) as usize;
200 let fx = if size == 0 { 0 } else { self.psi.index(size) };
201 (fx, size, ix)
202 }
203
204 fn update_ix(&mut self, pn: u64, ix: u64) {
206 let off = self.header_size + pn * 8;
207 self.write(PN_FILE, off, &ix.to_le_bytes()[0..6]);
208 }
209
210 fn clear(&mut self, fx: usize, off: u64, n: u64) {
212 let z = Arc::new(vec![0; n as usize]);
213 self.write_data(fx, off, z);
214 }
215
216 fn write(&mut self, fx: usize, off: u64, data: &[u8]) {
218 let data = Arc::new(data.to_vec());
219 self.write_data(fx, off, data);
220 }
221
222 fn write_data(&mut self, fx: usize, off: u64, data: Data) {
224 let n = data.len();
225 self.write_data_n(fx, off, data, n);
226 }
227
228 fn write_data_n(&mut self, fx: usize, off: u64, data: Data, n: usize) {
230 self.ds.write_data(&mut self.fd[fx], off, data, n);
231 self.save_fd(fx);
232 }
233
234 fn truncate(&mut self, fx: usize, off: u64) {
236 self.ds.truncate(&mut self.fd[fx], off);
237 self.save_fd(fx);
238 }
239
240 fn save_fd(&mut self, fx: usize) {
242 let fd = &mut self.fd[fx];
243 if fd.changed {
244 fd.changed = false;
245 self.header_dirty = true;
246 if fx == 0 {
247 self.ds.set_root(fd);
248 }
249 }
250 }
251
252 fn read(&self, fx: usize, off: u64, data: &mut [u8]) {
254 self.ds.read(&self.fd[fx], off, data);
255 }
256}
257
258impl PageStorage for BlockPageStg {
259 fn is_new(&self) -> bool {
260 self.is_new
261 }
262
263 fn new_page(&mut self) -> u64 {
264 if let Some(pn) = self.free_pn.pop_first() {
265 pn
266 } else {
267 self.header_dirty = true;
268 let pn = self.first_free_pn;
269 if pn != NOT_PN {
270 let (_fx, _size, next) = self.get_pn_info(pn);
271 self.first_free_pn = next;
272 pn
273 } else {
274 let pn = self.alloc_pn;
275 self.alloc_pn += 1;
276 pn
277 }
278 }
279 }
280
281 fn drop_page(&mut self, pn: u64) {
282 self.free_pn.insert(pn);
283 }
284
285 fn info(&self) -> Box<dyn PageStorageInfo> {
286 Box::new(self.psi.clone())
287 }
288
289 fn set_page(&mut self, pn: u64, data: Data) {
290 let size = data.len();
291 let fx = self.psi.index(size);
292 let ps = self.page_size(fx);
293 let (old_fx, mut old_size, mut ix) = self.get_pn_info(pn);
294 if size != old_size {
295 if fx != old_fx {
296 self.free_page(old_fx, ix);
297 ix = self.alloc(fx);
298 old_size = ps as usize - PAGE_HSIZE;
299 self.write(fx, ix * ps, &pn.to_le_bytes());
300 self.set_pn_info(pn, size, ix);
301 }
302 self.set_pn_info(pn, size, ix);
303 }
304
305 let off = PAGE_HSIZE as u64 + ix * ps;
306 self.write_data_n(fx, off, data, size);
307
308 if old_size > size {
310 self.write_data_n(fx, off + size as u64, self.zbytes.clone(), old_size - size);
311 }
312 }
313
314 fn get_page(&self, pn: u64) -> Data {
315 let (fx, size, ix) = self.get_pn_info(pn);
316 if fx == 0 {
317 return Data::default();
318 }
319 let mut data = vec![0; size];
320 let off = PAGE_HSIZE as u64 + ix * self.page_size(fx);
321 self.read(fx, off, &mut data);
322 Arc::new(data)
323 }
324
325 fn size(&self, pn: u64) -> usize {
326 self.get_pn_info(pn).1
327 }
328
329 fn save(&mut self) {
330 let flist = std::mem::take(&mut self.free_pn);
332 for pn in flist.iter().rev() {
333 let pn = *pn;
334 let (fx, _size, ix) = self.get_pn_info(pn);
335 self.free_page(fx, ix);
336 self.set_pn_info(pn, 0, self.first_free_pn);
337 self.first_free_pn = pn;
338 self.header_dirty = true;
339 }
340 if self.header_dirty {
341 self.write_header();
342 }
343 self.ds.save();
344 }
345
346 fn rollback(&mut self) {
347 self.free_pn.clear();
348 self.read_header();
349 }
350
351 fn wait_complete(&self) {
352 self.ds.wait_complete();
353 }
354
355 #[cfg(feature = "verify")]
356 fn get_free(&mut self) -> (crate::HashSet<u64>, u64) {
357 let mut free = crate::HashSet::default();
358 let mut pn = self.first_free_pn;
359 while pn != NOT_PN {
360 assert!(free.insert(pn));
361 let (_fx, _size, next) = self.get_pn_info(pn);
362 pn = next;
363 }
364 (free, self.alloc_pn)
365 }
366
367 #[cfg(feature = "renumber")]
368 fn load_free_pages(&mut self) -> Option<u64> {
369 let mut pn = self.first_free_pn;
370 if pn == NOT_PN {
371 return None;
372 }
373 while pn != NOT_PN {
374 let (_sx, _size, next) = self.get_pn_info(pn);
375 self.drop_page(pn);
376 pn = next;
377 }
378 self.first_free_pn = NOT_PN;
379 self.header_dirty = true;
380 Some(self.alloc_pn - self.free_pn.len() as u64)
381 }
382
383 #[cfg(feature = "renumber")]
384 fn renumber(&mut self, pn: u64) -> u64 {
385 let new_pn = self.new_page();
386 let (fx, size, ix) = self.get_pn_info(pn);
387 if fx != 0 {
388 let off = ix * self.page_size(fx);
389 self.write(fx, off, &new_pn.to_le_bytes());
390 }
391 self.set_pn_info(new_pn, size, ix);
392 self.set_pn_info(pn, 0, 0);
393 self.drop_page(pn);
394 new_pn
395 }
396
397 #[cfg(feature = "renumber")]
398 fn set_alloc_pn(&mut self, target: u64) {
399 assert!(self.first_free_pn == NOT_PN);
400 self.alloc_pn = target;
401 self.header_dirty = true;
402 self.free_pn.clear();
403 self.truncate(PN_FILE, self.header_size + target * 8);
404 }
405}
406
407#[derive(Clone)]
408struct SizeInfo {
409 blk_cap: usize,
410 max_div: usize,
411 sizes: usize,
412}
413
414impl PageStorageInfo for SizeInfo {
415 fn sizes(&self) -> usize {
417 self.sizes
418 }
419
420 fn index(&self, size: usize) -> usize {
422 let r = self.blk_cap / (size + PAGE_HSIZE);
423 if r >= self.max_div {
424 1
425 } else {
426 1 + self.max_div - r
427 }
428 }
429
430 fn size(&self, ix: usize) -> usize {
432 debug_assert!(ix > 0 && ix <= self.sizes);
433 let size = self.blk_cap / (1 + self.max_div - ix);
434 size - PAGE_HSIZE
435 }
436}
437
438#[test]
439fn test_block_page_stg() {
440 use atom_file::MemFile;
441 let stg = MemFile::new();
442 let limits = Limits::default();
443 let mut bps = BlockPageStg::new(stg.clone(), &limits);
444
445 let pn = bps.new_page();
446 let data = Arc::new(b"hello george".to_vec());
447
448 bps.set_page(pn, data.clone());
449
450 bps.save();
451 let mut bps = BlockPageStg::new(stg, &limits);
452
453 let data1 = bps.get_page(pn);
454 assert!(data == data1);
455
456 bps.save();
457}