1use atom_file::{Data, Storage};
33use heap::GHeap;
34use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
35use std::sync::{Arc, Mutex, RwLock};
36
37mod block;
38mod blockpagestg;
39mod dividedstg;
40mod heap;
41mod util;
42
43pub use blockpagestg::BlockPageStg;
44
45#[cfg(feature = "pstd")]
46use pstd::collections::BTreeMap;
47
48#[cfg(not(feature = "pstd"))]
49use std::collections::BTreeMap;
50
51#[derive(PartialEq, Eq, PartialOrd, Clone, Copy)]
53pub enum SaveOp {
54 Save,
55 RollBack,
56}
57
58pub trait PageStorage: Send + Sync {
60 fn is_new(&self) -> bool;
62 fn info(&self) -> Box<dyn PageStorageInfo>;
64 fn new_page(&mut self) -> u64;
66 fn drop_page(&mut self, pn: u64);
68 fn set_page(&mut self, pn: u64, data: Data);
70 fn get_page(&self, pn: u64) -> Data;
72 fn size(&self, pn: u64) -> usize;
74 fn save(&mut self);
76 fn rollback(&mut self);
78 fn wait_complete(&self);
80 #[cfg(feature = "verify")]
81 fn get_free(&mut self) -> (crate::HashSet<u64>, u64);
83 #[cfg(feature = "renumber")]
84 fn renumber(&mut self, pn: u64) -> u64;
86 #[cfg(feature = "renumber")]
87 fn load_free_pages(&mut self) -> Option<u64>;
89 #[cfg(feature = "renumber")]
90 fn set_alloc_pn(&mut self, target: u64);
92}
93
94pub trait PageStorageInfo: Send + Sync {
96 fn sizes(&self) -> usize;
98 fn index(&self, size: usize) -> usize;
100 fn size(&self, ix: usize) -> usize;
102 fn max_size_page(&self) -> usize {
104 self.size(self.sizes())
105 }
106 fn half_size_page(&self) -> usize {
108 self.size(self.index(self.max_size_page() / 2 - 50))
109 }
110 fn compress(&self, size: usize, saving: usize) -> bool {
112 self.index(size - saving) < self.index(size)
113 }
114}
115
116type HX = u32; type Heap = GHeap<u64, u64, HX>;
118
119type PageInfoPtr = Arc<Mutex<PageInfo>>;
121
122pub struct PageInfo {
124 pub current: Option<Data>,
126 pub history: BTreeMap<u64, Data>,
129 pub usage: u64,
131 pub hx: HX,
133}
134
135impl PageInfo {
136 fn new() -> PageInfoPtr {
137 Arc::new(Mutex::new(PageInfo {
138 current: None,
139 history: BTreeMap::new(),
140 usage: 0,
141 hx: HX::MAX,
142 }))
143 }
144
145 fn inc_usage(&mut self, lpnum: u64, ah: &mut Heap) {
147 self.usage += 1;
148 if self.hx == HX::MAX {
149 self.hx = ah.insert(lpnum, self.usage);
150 } else {
151 ah.modify(self.hx, self.usage);
152 }
153 }
154
155 fn get_data(&mut self, lpnum: u64, a: &AccessPagedData) -> (Data, usize) {
159 if !a.writer
160 && let Some((_k, v)) = self.history.range(a.time..).next()
161 {
162 return (v.clone(), 0);
163 }
164
165 if let Some(p) = &self.current {
166 return (p.clone(), 0);
167 }
168
169 let ps = a.spd.ps.read().unwrap();
171 let data = ps.get_page(lpnum);
172 self.current = Some(data.clone());
173 let len = data.len();
174 (data, len)
175 }
176
177 fn set_data(&mut self, time: u64, old: Data, data: Data, do_history: bool) -> (usize, usize) {
180 if do_history {
181 self.history.insert(time, old);
182 }
183 let old = if let Some(x) = &self.current {
184 x.len()
185 } else {
186 0
187 };
188 let new = data.len();
189 self.current = if new == 0 { None } else { Some(data) };
190 (old, new)
191 }
192
193 fn trim(&mut self, t: u64, start: u64) -> bool {
196 let first = self.history_start(t);
197 if first >= start {
198 self.history.remove(&t);
200 false
201 } else {
202 true
203 }
204 }
205
206 fn history_start(&self, t: u64) -> u64 {
208 if let Some((k, _)) = self.history.range(..t).next_back() {
209 *k + 1
210 } else {
211 0
212 }
213 }
214}
215
216#[derive(Default)]
218pub struct Stash {
219 time: u64,
221 pub pages: HashMap<u64, PageInfoPtr>,
223 rdrs: BTreeMap<u64, usize>,
225 vers: BTreeMap<u64, HashSet<u64>>,
227 pub total: i64, pub mem_limit: usize,
231 min: Heap,
233 pub read: u64,
235 pub miss: u64,
237}
238
239impl Stash {
240 fn set(&mut self, lpnum: u64, old: Data, data: Data) {
242 let time = self.time;
243 let u = self.vers.entry(time).or_default();
244 let do_history = u.insert(lpnum);
245 let p = self.get_pinfo(lpnum);
246 let diff = p.lock().unwrap().set_data(time, old, data, do_history);
247 self.delta(diff, false, false);
248 }
249
250 fn get_pinfo(&mut self, lpnum: u64) -> PageInfoPtr {
252 let p = self
253 .pages
254 .entry(lpnum)
255 .or_insert_with(PageInfo::new)
256 .clone();
257 p.lock().unwrap().inc_usage(lpnum, &mut self.min);
258 self.read += 1;
259 p
260 }
261
262 fn begin_read(&mut self) -> u64 {
264 let time = self.time;
265 let n = self.rdrs.entry(time).or_insert(0);
266 *n += 1;
267 time
268 }
269
270 fn end_read(&mut self, time: u64) {
272 let n = self.rdrs.get_mut(&time).unwrap();
273 *n -= 1;
274 if *n == 0 {
275 self.rdrs.remove(&time);
276 self.trim(time);
277 }
278 }
279
280 fn end_write(&mut self) -> usize {
283 let result = if let Some(u) = self.vers.get(&self.time) {
284 u.len()
285 } else {
286 0
287 };
288 let t = self.time;
289 self.time = t + 1;
290 self.trim(t);
291 result
292 }
293
294 fn trim(&mut self, time: u64) {
296 let (s, r) = (self.start(time), self.retain(time));
297 if s != r {
298 let mut empty = Vec::<u64>::new();
299 for (t, pl) in self.vers.range_mut(s..r) {
300 pl.retain(|pnum| {
301 let p = self.pages.get(pnum).unwrap();
302 p.lock().unwrap().trim(*t, s)
303 });
304 if pl.is_empty() {
305 empty.push(*t);
306 }
307 }
308 for t in empty {
309 self.vers.remove(&t);
310 }
311 }
312 }
313
314 fn start(&self, time: u64) -> u64 {
316 if let Some((t, _n)) = self.rdrs.range(..time).next_back() {
317 1 + *t
318 } else {
319 0
320 }
321 }
322
323 fn retain(&self, time: u64) -> u64 {
325 if let Some((t, _n)) = self.rdrs.range(time..).next() {
326 *t
327 } else {
328 self.time
329 }
330 }
331
332 fn delta(&mut self, d: (usize, usize), miss: bool, trim: bool) {
334 if miss {
335 self.miss += 1;
336 }
337 self.total += d.1 as i64 - d.0 as i64;
338 if trim {
339 self.trim_cache();
340 }
341 }
342
343 pub fn trim_cache(&mut self) {
345 while self.total > self.mem_limit as i64 && self.min.len() > 0 {
346 let lpnum = self.min.pop();
347 let mut p = self.pages.get(&lpnum).unwrap().lock().unwrap();
348 p.hx = HX::MAX;
349 if let Some(data) = &p.current {
350 self.total -= data.len() as i64;
351 p.current = None;
352 }
353 }
354 }
355
356 pub fn cached(&self) -> usize {
358 self.min.len() as usize
359 }
360}
361
362pub struct SharedPagedData {
364 pub ps: RwLock<Box<dyn PageStorage>>,
366 pub stash: Mutex<Stash>,
368 pub psi: Box<dyn PageStorageInfo>,
370}
371
372impl SharedPagedData {
373 pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
391 let limits = crate::Limits::default();
392 Self::new_from_ps(crate::blockpagestg::BlockPageStg::new(stg, &limits))
393 }
394
395 pub fn new_from_ps(ps: Box<dyn PageStorage>) -> Arc<Self> {
397 let stash = Stash {
399 mem_limit: 10 * 1024 * 1024,
400 ..Default::default()
401 };
402 let psi = ps.info();
403 Arc::new(Self {
404 stash: Mutex::new(stash),
405 ps: RwLock::new(ps),
406 psi,
407
408 })
409 }
410
411 pub fn new_reader(self: &Arc<Self>) -> AccessPagedData
413 {
414 let time = self.stash.lock().unwrap().begin_read();
415 AccessPagedData {
416 writer: false,
417 time,
418 spd: self.clone(),
419 }
420 }
421
422 pub fn new_writer(self: &Arc<Self>) -> AccessPagedData{
424 AccessPagedData {
425 writer: true,
426 time: 0,
427 spd: self.clone(),
428 }
429 }
430
431 pub fn wait_complete(&self) {
433 self.ps.read().unwrap().wait_complete();
434 }
435}
436
437pub struct AccessPagedData {
439 writer: bool,
440 time: u64,
441 pub spd: Arc<SharedPagedData>,
443}
444
445impl AccessPagedData {
446 pub fn new_reader(spd: Arc<SharedPagedData>) -> Self {
450 let time = spd.stash.lock().unwrap().begin_read();
451 AccessPagedData {
452 writer: false,
453 time,
454 spd,
455 }
456 }
457
458 pub fn new_writer(spd: Arc<SharedPagedData>) -> Self {
462 AccessPagedData {
463 writer: true,
464 time: 0,
465 spd,
466 }
467 }
468
469 pub fn stash(&self) -> std::sync::MutexGuard<'_, Stash> {
471 self.spd.stash.lock().unwrap()
472 }
473
474 pub fn get_data(&self, lpnum: u64) -> Data {
476 let pinfo = self.stash().get_pinfo(lpnum);
478
479 let (data, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
481
482 if loaded > 0 {
483 self.stash().delta((0, loaded), true, true);
484 }
485 data
486 }
487
488 pub fn set_data(&self, lpnum: u64, data: Data) {
490 debug_assert!(self.writer);
491
492 let pinfo = self.stash().get_pinfo(lpnum);
494
495 let (old, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
497
498 {
500 let s = &mut *self.stash();
501 if loaded > 0 {
502 s.delta((0, loaded), true, false);
503 }
504 s.set(lpnum, old, data.clone());
505 s.trim_cache();
506 }
507
508 if !data.is_empty() {
510 self.spd.ps.write().unwrap().set_page(lpnum, data);
511 } else {
512 self.spd.ps.write().unwrap().drop_page(lpnum);
513 }
514 }
515
516 pub fn alloc_page(&self) -> u64 {
518 debug_assert!(self.writer);
519 self.spd.ps.write().unwrap().new_page()
520 }
521
522 pub fn free_page(&self, lpnum: u64) {
524 self.set_data(lpnum, Data::default());
525 }
526
527 pub fn is_new(&self) -> bool {
529 self.writer && self.spd.ps.read().unwrap().is_new()
530 }
531
532 pub fn compress(&self, size: usize, saving: usize) -> bool {
534 debug_assert!(self.writer);
535 self.spd.psi.compress(size, saving)
536 }
537
538 pub fn save(&self, op: SaveOp) -> usize {
540 debug_assert!(self.writer);
541 match op {
542 SaveOp::Save => {
543 self.spd.ps.write().unwrap().save();
544 self.stash().end_write()
545 }
546 SaveOp::RollBack => {
547 self.spd.ps.write().unwrap().rollback();
550 0
551 }
552 }
553 }
554
555 #[cfg(feature = "renumber")]
557 pub fn renumber_page(&self, lpnum: u64) -> u64 {
558 assert!(self.writer);
559 let data = self.get_data(lpnum);
560 self.stash().set(lpnum, data.clone(), Data::default());
561 let lpnum2 = self.spd.ps.write().unwrap().renumber(lpnum);
562 debug_assert!(
563 self.stash()
564 .get_pinfo(lpnum2)
565 .lock()
566 .unwrap()
567 .current
568 .is_none()
569 );
570 let old2 = self.get_data(lpnum2);
571 self.stash().set(lpnum2, old2, data);
572 lpnum2
573 }
574}
575
576impl Drop for AccessPagedData {
577 fn drop(&mut self) {
578 if !self.writer {
579 self.stash().end_read(self.time);
580 }
581 }
582}
583
584#[non_exhaustive]
586pub struct Limits {
587 pub af_lim: atom_file::Limits,
589 pub blk_cap: u64,
591 pub page_sizes: usize,
593 pub max_div: usize,
595}
596
597impl Default for Limits {
598 fn default() -> Self {
599 Self {
600 af_lim: atom_file::Limits::default(),
601 blk_cap: 27720,
602 page_sizes: 7,
603 max_div: 12,
604 }
605 }
606}