1use crate::{Data, Limits, PageStorage, PageStorageInfo, Storage, dividedstg, util};
2use dividedstg::{DividedStg, FD, FD_SIZE};
3use std::collections::BTreeSet;
4use std::sync::Arc;
5
6pub struct BlockPageStg {
20 pub ds: DividedStg,
22 alloc_pn: u64,
23 first_free_pn: u64,
24 fd: Vec<FD>,
25 free_pn: BTreeSet<u64>, header_dirty: bool,
27 is_new: bool,
28 psi: SizeInfo,
29 header_size: u64,
30 zbytes: Data,
31}
32
33const PN_FILE: usize = 0; const NOT_PN: u64 = u64::MAX >> 16; const PAGE_HSIZE: usize = 8; const HEADER_SIZE: usize = 24; impl BlockPageStg {
39 pub fn new(stg: Box<dyn Storage>, lim: &Limits) -> Box<Self> {
41 let is_new = stg.size() == 0;
42
43 let sizes = lim.page_sizes;
44 let max_div = lim.max_div;
45 let ds = DividedStg::new(stg, lim.blk_cap);
46 let blk_cap = ds.blk_cap as usize;
47
48 let mut s = Self {
49 ds,
50 alloc_pn: 0,
51 first_free_pn: NOT_PN,
52 fd: Vec::new(),
53 free_pn: BTreeSet::default(),
54 header_dirty: true,
55 is_new,
56 psi: SizeInfo {
57 blk_cap,
58 max_div,
59 sizes,
60 },
61 header_size: 0,
62 zbytes: Data::default(),
63 };
64
65 if is_new {
66 for _i in 0..sizes + 1 {
67 s.fd.push(s.ds.new_file());
68 }
69 s.ds.set_root(&s.fd[0]);
70 } else {
71 s.read_header();
72 }
73
74 assert!(
76 s.psi.max_size_page() <= u16::MAX as usize,
77 "Max page size is 65535"
78 );
79
80 s.header_size = (HEADER_SIZE + s.psi.sizes * FD_SIZE) as u64;
81 s.zbytes = Arc::new(vec![0; s.psi.max_size_page()]);
82
83 #[cfg(feature = "log")]
84 println!("bps new alloc={:?}", &s.allocs());
85
86 Box::new(s)
87 }
88
89 fn read_header(&mut self) {
91 self.fd.clear();
92 self.fd.push(self.ds.get_root());
93
94 let mut buf = [0; HEADER_SIZE];
95 self.read(PN_FILE, 0, &mut buf);
96 self.alloc_pn = util::getu64(&buf, 0);
97 self.first_free_pn = util::getu64(&buf, 8);
98
99 self.psi.max_div = util::get(&buf, 16, 4) as usize;
100 self.psi.sizes = util::get(&buf, 20, 4) as usize;
101
102 let sizes = self.psi.sizes;
103 let mut buf = vec![0; FD_SIZE * sizes];
104 self.read(PN_FILE, HEADER_SIZE as u64, &mut buf);
105
106 for fx in 0..sizes {
107 let off = fx * FD_SIZE;
108 self.fd.push(self.ds.load_fd(&buf[off..]));
109 }
110 self.header_dirty = false;
111 }
112
113 fn write_header(&mut self) {
115 let mut buf = vec![0; self.header_size as usize];
116 util::setu64(&mut buf, self.alloc_pn);
117 util::setu64(&mut buf[8..], self.first_free_pn);
118 util::set(&mut buf, 16, self.psi.max_div as u64, 4);
119 util::set(&mut buf, 20, self.psi.sizes as u64, 4);
120
121 for fx in 0..self.psi.sizes {
122 let off = HEADER_SIZE + fx * FD_SIZE;
123 self.ds.save_fd(&self.fd[fx + 1], &mut buf[off..]);
124 }
125 self.write(PN_FILE, 0, &buf);
126 self.header_dirty = false;
127
128 #[cfg(feature = "log")]
129 println!("bps write_header allocs={:?}", &self.allocs());
130 }
131
132 #[cfg(feature = "log")]
133 fn allocs(&self) -> Vec<u64> {
134 (0..self.psi.sizes() + 1).map(|x| self.alloc(x)).collect()
135 }
136
137 fn page_size(&self, fx: usize) -> u64 {
139 (self.psi.size(fx) + PAGE_HSIZE) as u64
140 }
141
142 fn fsize(&self, fx: usize) -> u64 {
144 let size = self.fd[fx].size();
145 if fx == 0 && size < self.header_size {
146 self.header_size
147 } else {
148 size
149 }
150 }
151
152 fn alloc(&self, fx: usize) -> u64 {
154 let size = self.fsize(fx);
155 if fx == 0 {
156 (size - self.header_size) / 8
157 } else {
158 let ps = self.page_size(fx);
159 size.div_ceil(ps)
160 }
161 }
162
163 fn free_page(&mut self, fx: usize, ix: u64) {
165 if fx != 0 {
166 let last = self.alloc(fx) - 1;
167 let ps = self.page_size(fx);
168 if last != ix {
169 let mut buf = vec![0; ps as usize];
170 self.read(fx, last * ps, &mut buf);
171 let pn = util::getu64(&buf, 0);
172 let (fx1, _size, ix1) = self.get_pn_info(pn);
173 assert!(fx1 == fx && ix1 == last);
174 self.update_ix(pn, ix);
175 self.write_data(fx, ix * ps, Arc::new(buf));
176 }
177 self.truncate(fx, last * ps);
178 }
179 }
180
181 fn set_pn_info(&mut self, pn: u64, size: usize, ix: u64) {
183 let off = self.header_size + pn * 8;
184 let eof = self.fsize(PN_FILE);
185 if off > eof {
186 self.clear(PN_FILE, eof, off - eof);
187 }
188 let mut buf = [0; 8];
189 util::set(&mut buf, 0, ix, 6);
190 util::set(&mut buf, 6, size as u64, 2);
191 self.write(PN_FILE, off, &buf);
192 }
193
194 fn get_pn_info(&self, pn: u64) -> (usize, usize, u64) {
196 let off = self.header_size + pn * 8;
197 if off >= self.fsize(0) {
198 return (0, 0, 0);
199 }
200 let mut buf = [0; 8];
201 self.read(PN_FILE, off, &mut buf);
202 let ix = util::get(&buf, 0, 6);
203 let size = util::get(&buf, 6, 2) as usize;
204 let fx = if size == 0 { 0 } else { self.psi.index(size) };
205 (fx, size, ix)
206 }
207
208 fn update_ix(&mut self, pn: u64, ix: u64) {
210 let off = self.header_size + pn * 8;
211 self.write(PN_FILE, off, &ix.to_le_bytes()[0..6]);
212 }
213
214 fn clear(&mut self, fx: usize, off: u64, n: u64) {
216 let z = Arc::new(vec![0; n as usize]);
217 self.write_data(fx, off, z);
218 }
219
220 fn write(&mut self, fx: usize, off: u64, data: &[u8]) {
222 let data = Arc::new(data.to_vec());
223 self.write_data(fx, off, data);
224 }
225
226 fn write_data(&mut self, fx: usize, off: u64, data: Data) {
228 let n = data.len();
229 self.write_data_n(fx, off, data, n);
230 }
231
232 fn write_data_n(&mut self, fx: usize, off: u64, data: Data, n: usize) {
234 self.ds.write_data(&mut self.fd[fx], off, data, n);
235 self.save_fd(fx);
236 }
237
238 fn truncate(&mut self, fx: usize, off: u64) {
240 self.ds.truncate(&mut self.fd[fx], off);
241 self.save_fd(fx);
242 }
243
244 fn save_fd(&mut self, fx: usize) {
246 let fd = &mut self.fd[fx];
247 if fd.changed {
248 fd.changed = false;
249 self.header_dirty = true;
250 if fx == 0 {
251 self.ds.set_root(fd);
252 }
253 }
254 }
255
256 fn read(&self, fx: usize, off: u64, data: &mut [u8]) {
258 self.ds.read(&self.fd[fx], off, data);
259 }
260}
261
262impl PageStorage for BlockPageStg {
263 fn is_new(&self) -> bool {
264 self.is_new
265 }
266
267 fn new_page(&mut self) -> u64 {
268 if let Some(pn) = self.free_pn.pop_first() {
269 pn
270 } else {
271 self.header_dirty = true;
272 let pn = self.first_free_pn;
273 if pn != NOT_PN {
274 let (_fx, _size, next) = self.get_pn_info(pn);
275 self.first_free_pn = next;
276 pn
277 } else {
278 let pn = self.alloc_pn;
279 self.alloc_pn += 1;
280 pn
281 }
282 }
283 }
284
285 fn drop_page(&mut self, pn: u64) {
286 self.free_pn.insert(pn);
287 }
288
289 fn info(&self) -> Box<dyn PageStorageInfo> {
290 Box::new(self.psi.clone())
291 }
292
293 fn set_page(&mut self, pn: u64, data: Data) {
294 let size = data.len();
295 let fx = self.psi.index(size);
296 let ps = self.page_size(fx);
297 let (old_fx, mut old_size, mut ix) = self.get_pn_info(pn);
298 if size != old_size {
299 if fx != old_fx {
300 self.free_page(old_fx, ix);
301 ix = self.alloc(fx);
302 old_size = ps as usize - PAGE_HSIZE;
303 self.write(fx, ix * ps, &pn.to_le_bytes());
304 self.set_pn_info(pn, size, ix);
305 }
306 self.set_pn_info(pn, size, ix);
307 }
308
309 let off = PAGE_HSIZE as u64 + ix * ps;
310 self.write_data_n(fx, off, data, size);
311
312 if old_size > size {
314 self.write_data_n(fx, off + size as u64, self.zbytes.clone(), old_size - size);
315 }
316 }
317
318 fn get_page(&self, pn: u64) -> Data {
319 let (fx, size, ix) = self.get_pn_info(pn);
320 if fx == 0 {
321 return Data::default();
322 }
323 let mut data = vec![0; size];
324 let off = PAGE_HSIZE as u64 + ix * self.page_size(fx);
325 self.read(fx, off, &mut data);
326 Arc::new(data)
327 }
328
329 fn size(&self, pn: u64) -> usize {
330 self.get_pn_info(pn).1
331 }
332
333 fn save(&mut self) {
334 let flist = std::mem::take(&mut self.free_pn);
336 for pn in flist.iter().rev() {
337 let pn = *pn;
338 let (fx, _size, ix) = self.get_pn_info(pn);
339 self.free_page(fx, ix);
340 self.set_pn_info(pn, 0, self.first_free_pn);
341 self.first_free_pn = pn;
342 self.header_dirty = true;
343 }
344 if self.header_dirty {
345 self.write_header();
346 }
347 self.ds.save();
348 }
349
350 fn rollback(&mut self) {
351 self.free_pn.clear();
352 self.read_header();
353 }
354
355 fn wait_complete(&self) {
356 self.ds.wait_complete();
357 }
358
359 #[cfg(feature = "verify")]
360 fn get_free(&mut self) -> (crate::HashSet<u64>, u64) {
361 let mut free = crate::HashSet::default();
362 let mut pn = self.first_free_pn;
363 while pn != NOT_PN {
364 assert!(free.insert(pn));
365 let (_fx, _size, next) = self.get_pn_info(pn);
366 pn = next;
367 }
368 (free, self.alloc_pn)
369 }
370
371 #[cfg(feature = "renumber")]
372 fn load_free_pages(&mut self) -> Option<u64> {
373 let mut pn = self.first_free_pn;
374 if pn == NOT_PN {
375 return None;
376 }
377 while pn != NOT_PN {
378 let (_sx, _size, next) = self.get_pn_info(pn);
379 self.drop_page(pn);
380 pn = next;
381 }
382 self.first_free_pn = NOT_PN;
383 self.header_dirty = true;
384 Some(self.alloc_pn - self.free_pn.len() as u64)
385 }
386
387 #[cfg(feature = "renumber")]
388 fn renumber(&mut self, pn: u64) -> u64 {
389 let new_pn = self.new_page();
390 let (fx, size, ix) = self.get_pn_info(pn);
391 if fx != 0 {
392 let off = ix * self.page_size(fx);
393 self.write(fx, off, &new_pn.to_le_bytes());
394 }
395 self.set_pn_info(new_pn, size, ix);
396 self.set_pn_info(pn, 0, 0);
397 self.drop_page(pn);
398 new_pn
399 }
400
401 #[cfg(feature = "renumber")]
402 fn set_alloc_pn(&mut self, target: u64) {
403 assert!(self.first_free_pn == NOT_PN);
404 self.alloc_pn = target;
405 self.header_dirty = true;
406 self.free_pn.clear();
407 self.truncate(PN_FILE, self.header_size + target * 8);
408 }
409}
410
411#[derive(Clone)]
412struct SizeInfo {
413 blk_cap: usize,
414 max_div: usize,
415 sizes: usize,
416}
417
418impl PageStorageInfo for SizeInfo {
419 fn sizes(&self) -> usize {
421 self.sizes
422 }
423
424 fn index(&self, size: usize) -> usize {
426 let r = self.blk_cap / (size + PAGE_HSIZE);
427 if r >= self.max_div {
428 1
429 } else {
430 1 + self.max_div - r
431 }
432 }
433
434 fn size(&self, ix: usize) -> usize {
436 debug_assert!(ix > 0 && ix <= self.sizes);
437 let size = self.blk_cap / (1 + self.max_div - ix);
438 size - PAGE_HSIZE
439 }
440}
441
442#[test]
443fn test_block_page_stg() {
444 use atom_file::{BasicStorage, MemFile};
445 let stg = MemFile::new();
446 let limits = Limits::default();
447 let mut bps = BlockPageStg::new(stg.clone(), &limits);
448
449 let pn = bps.new_page();
450 let data = Arc::new(b"hello george".to_vec());
451
452 bps.set_page(pn, data.clone());
453
454 bps.save();
455 let mut bps = BlockPageStg::new(stg, &limits);
456
457 let data1 = bps.get_page(pn);
458 assert!(data == data1);
459
460 bps.save();
461}