1#![deny(missing_docs)]
33
34use atom_file::{Data, Storage};
35use heap::GHeap;
36pub use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
37use std::sync::{Arc, Mutex, RwLock};
38
39mod block;
40mod blockpagestg;
41mod dividedstg;
42mod heap;
43mod util;
44
45pub use blockpagestg::BlockPageStg;
46
47#[cfg(feature = "pstd")]
48use pstd::collections::BTreeMap;
49
50#[cfg(not(feature = "pstd"))]
51use std::collections::BTreeMap;
52
53#[derive(PartialEq, Eq, PartialOrd, Clone, Copy)]
55pub enum SaveOp {
56 Save,
58 RollBack,
60}
61
62pub trait PageStorage: Send + Sync {
64 fn is_new(&self) -> bool;
66 fn info(&self) -> Box<dyn PageStorageInfo>;
68 fn new_page(&mut self) -> u64;
70 fn drop_page(&mut self, pn: u64);
72 fn set_page(&mut self, pn: u64, data: Data);
74 fn get_page(&self, pn: u64) -> Data;
76 fn size(&self, pn: u64) -> usize;
78 fn save(&mut self);
80 fn rollback(&mut self);
82 fn wait_complete(&self);
84 #[cfg(feature = "verify")]
85 fn get_free(&mut self) -> (crate::HashSet<u64>, u64);
87 #[cfg(feature = "renumber")]
88 fn renumber(&mut self, pn: u64) -> u64;
90 #[cfg(feature = "renumber")]
91 fn load_free_pages(&mut self) -> Option<u64>;
93 #[cfg(feature = "renumber")]
94 fn set_alloc_pn(&mut self, target: u64);
96}
97
98pub trait PageStorageInfo: Send + Sync {
100 fn sizes(&self) -> usize;
102 fn index(&self, size: usize) -> usize;
104 fn size(&self, ix: usize) -> usize;
106 fn max_size_page(&self) -> usize {
108 self.size(self.sizes())
109 }
110 fn half_size_page(&self) -> usize {
112 self.size(self.index(self.max_size_page() / 2 - 50))
113 }
114 fn compress(&self, size: usize, saving: usize) -> bool {
116 self.index(size - saving) < self.index(size)
117 }
118}
119
120type HX = u32; type Heap = GHeap<u64, u64, HX>;
122
123type PageInfoPtr = Arc<Mutex<PageInfo>>;
125
126pub struct PageInfo {
128 pub current: Option<Data>,
130 pub history: BTreeMap<u64, Data>,
133 pub usage: u64,
135 pub hx: HX,
137}
138
139impl PageInfo {
140 fn new() -> PageInfoPtr {
141 Arc::new(Mutex::new(PageInfo {
142 current: None,
143 history: BTreeMap::new(),
144 usage: 0,
145 hx: HX::MAX,
146 }))
147 }
148
149 fn inc_usage(&mut self, lpnum: u64, ah: &mut Heap) {
151 self.usage += 1;
152 if self.hx == HX::MAX {
153 self.hx = ah.insert(lpnum, self.usage);
154 } else {
155 ah.modify(self.hx, self.usage);
156 }
157 }
158
159 fn get_data(&mut self, lpnum: u64, a: &AccessPagedData) -> (Data, usize) {
163 if !a.writer
164 && let Some((_k, v)) = self.history.range(a.time..).next()
165 {
166 return (v.clone(), 0);
167 }
168
169 if let Some(p) = &self.current {
170 return (p.clone(), 0);
171 }
172
173 let ps = a.spd.ps.read().unwrap();
175 let data = ps.get_page(lpnum);
176 self.current = Some(data.clone());
177 let len = data.len();
178 (data, len)
179 }
180
181 fn set_data(&mut self, time: u64, old: Data, data: Data, do_history: bool) -> (usize, usize) {
184 if do_history {
185 self.history.insert(time, old);
186 }
187 let old = if let Some(x) = &self.current {
188 x.len()
189 } else {
190 0
191 };
192 let new = data.len();
193 self.current = if new == 0 { None } else { Some(data) };
194 (old, new)
195 }
196
197 fn trim(&mut self, t: u64, start: u64) -> bool {
200 let first = self.history_start(t);
201 if first >= start {
202 self.history.remove(&t);
204 false
205 } else {
206 true
207 }
208 }
209
210 fn history_start(&self, t: u64) -> u64 {
212 if let Some((k, _)) = self.history.range(..t).next_back() {
213 *k + 1
214 } else {
215 0
216 }
217 }
218}
219
220#[derive(Default)]
222pub struct Stash {
223 time: u64,
225 pub pages: HashMap<u64, PageInfoPtr>,
227 rdrs: BTreeMap<u64, usize>,
229 vers: BTreeMap<u64, HashSet<u64>>,
231 pub total: i64, pub mem_limit: usize,
235 min: Heap,
237 pub read: u64,
239 pub miss: u64,
241}
242
243impl Stash {
244 fn set(&mut self, lpnum: u64, old: Data, data: Data) {
246 let time = self.time;
247 let u = self.vers.entry(time).or_default();
248 let do_history = u.insert(lpnum);
249 let p = self.get_pinfo(lpnum);
250 let diff = p.lock().unwrap().set_data(time, old, data, do_history);
251 self.delta(diff, false, false);
252 }
253
254 fn get_pinfo(&mut self, lpnum: u64) -> PageInfoPtr {
256 let p = self
257 .pages
258 .entry(lpnum)
259 .or_insert_with(PageInfo::new)
260 .clone();
261 p.lock().unwrap().inc_usage(lpnum, &mut self.min);
262 self.read += 1;
263 p
264 }
265
266 fn begin_read(&mut self) -> u64 {
268 let time = self.time;
269 let n = self.rdrs.entry(time).or_insert(0);
270 *n += 1;
271 time
272 }
273
274 fn end_read(&mut self, time: u64) {
276 let n = self.rdrs.get_mut(&time).unwrap();
277 *n -= 1;
278 if *n == 0 {
279 self.rdrs.remove(&time);
280 self.trim(time);
281 }
282 }
283
284 fn end_write(&mut self) -> usize {
287 let result = if let Some(u) = self.vers.get(&self.time) {
288 u.len()
289 } else {
290 0
291 };
292 let t = self.time;
293 self.time = t + 1;
294 self.trim(t);
295 result
296 }
297
298 fn trim(&mut self, time: u64) {
300 let (s, r) = (self.start(time), self.retain(time));
301 if s != r {
302 let mut empty = Vec::<u64>::new();
303 for (t, pl) in self.vers.range_mut(s..r) {
304 pl.retain(|pnum| {
305 let p = self.pages.get(pnum).unwrap();
306 p.lock().unwrap().trim(*t, s)
307 });
308 if pl.is_empty() {
309 empty.push(*t);
310 }
311 }
312 for t in empty {
313 self.vers.remove(&t);
314 }
315 }
316 }
317
318 fn start(&self, time: u64) -> u64 {
320 if let Some((t, _n)) = self.rdrs.range(..time).next_back() {
321 1 + *t
322 } else {
323 0
324 }
325 }
326
327 fn retain(&self, time: u64) -> u64 {
329 if let Some((t, _n)) = self.rdrs.range(time..).next() {
330 *t
331 } else {
332 self.time
333 }
334 }
335
336 fn delta(&mut self, d: (usize, usize), miss: bool, trim: bool) {
338 if miss {
339 self.miss += 1;
340 }
341 self.total += d.1 as i64 - d.0 as i64;
342 if trim {
343 self.trim_cache();
344 }
345 }
346
347 pub fn trim_cache(&mut self) {
349 while self.total > self.mem_limit as i64 && self.min.len() > 0 {
350 let lpnum = self.min.pop();
351 let mut p = self.pages.get(&lpnum).unwrap().lock().unwrap();
352 p.hx = HX::MAX;
353 if let Some(data) = &p.current {
354 self.total -= data.len() as i64;
355 p.current = None;
356 }
357 }
358 }
359
360 pub fn cached(&self) -> usize {
362 self.min.len() as usize
363 }
364}
365
366pub struct SharedPagedData {
368 pub ps: RwLock<Box<dyn PageStorage>>,
370 pub stash: Mutex<Stash>,
372 pub psi: Box<dyn PageStorageInfo>,
374}
375
376impl SharedPagedData {
377 pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
395 let limits = crate::Limits::default();
396 Self::new_from_ps(crate::blockpagestg::BlockPageStg::new(stg, &limits))
397 }
398
399 pub fn new_from_ps(ps: Box<dyn PageStorage>) -> Arc<Self> {
401 let stash = Stash {
403 mem_limit: 10 * 1024 * 1024,
404 ..Default::default()
405 };
406 let psi = ps.info();
407 Arc::new(Self {
408 stash: Mutex::new(stash),
409 ps: RwLock::new(ps),
410 psi,
411
412 })
413 }
414
415 pub fn new_reader(self: &Arc<Self>) -> AccessPagedData
417 {
418 let time = self.stash.lock().unwrap().begin_read();
419 AccessPagedData {
420 writer: false,
421 time,
422 spd: self.clone(),
423 }
424 }
425
426 pub fn new_writer(self: &Arc<Self>) -> AccessPagedData{
428 AccessPagedData {
429 writer: true,
430 time: 0,
431 spd: self.clone(),
432 }
433 }
434
435 pub fn wait_complete(&self) {
437 self.ps.read().unwrap().wait_complete();
438 }
439}
440
441pub struct AccessPagedData {
443 writer: bool,
444 time: u64,
445 pub spd: Arc<SharedPagedData>,
447}
448
449impl AccessPagedData {
450 #[deprecated(note="use SharedPagedData::new_reader instead")]
452 pub fn new_reader(spd: Arc<SharedPagedData>) -> Self {
453 let time = spd.stash.lock().unwrap().begin_read();
454 AccessPagedData {
455 writer: false,
456 time,
457 spd,
458 }
459 }
460
461 #[deprecated(note="use SharedPagedData::new_writer instead")]
463 pub fn new_writer(spd: Arc<SharedPagedData>) -> Self {
464 AccessPagedData {
465 writer: true,
466 time: 0,
467 spd,
468 }
469 }
470
471 pub fn stash(&self) -> std::sync::MutexGuard<'_, Stash> {
473 self.spd.stash.lock().unwrap()
474 }
475
476 pub fn get_data(&self, lpnum: u64) -> Data {
478 let pinfo = self.stash().get_pinfo(lpnum);
480
481 let (data, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
483
484 if loaded > 0 {
485 self.stash().delta((0, loaded), true, true);
486 }
487 data
488 }
489
490 pub fn set_data(&self, lpnum: u64, data: Data) {
492 debug_assert!(self.writer);
493
494 let pinfo = self.stash().get_pinfo(lpnum);
496
497 let (old, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
499
500 {
502 let s = &mut *self.stash();
503 if loaded > 0 {
504 s.delta((0, loaded), true, false);
505 }
506 s.set(lpnum, old, data.clone());
507 s.trim_cache();
508 }
509
510 if !data.is_empty() {
512 self.spd.ps.write().unwrap().set_page(lpnum, data);
513 } else {
514 self.spd.ps.write().unwrap().drop_page(lpnum);
515 }
516 }
517
518 pub fn alloc_page(&self) -> u64 {
520 debug_assert!(self.writer);
521 self.spd.ps.write().unwrap().new_page()
522 }
523
524 pub fn free_page(&self, lpnum: u64) {
526 self.set_data(lpnum, Data::default());
527 }
528
529 pub fn is_new(&self) -> bool {
531 self.writer && self.spd.ps.read().unwrap().is_new()
532 }
533
534 pub fn compress(&self, size: usize, saving: usize) -> bool {
536 debug_assert!(self.writer);
537 self.spd.psi.compress(size, saving)
538 }
539
540 pub fn save(&self, op: SaveOp) -> usize {
542 debug_assert!(self.writer);
543 match op {
544 SaveOp::Save => {
545 self.spd.ps.write().unwrap().save();
546 self.stash().end_write()
547 }
548 SaveOp::RollBack => {
549 self.spd.ps.write().unwrap().rollback();
552 0
553 }
554 }
555 }
556
557 #[cfg(feature = "renumber")]
559 pub fn renumber_page(&self, lpnum: u64) -> u64 {
560 assert!(self.writer);
561 let data = self.get_data(lpnum);
562 self.stash().set(lpnum, data.clone(), Data::default());
563 let lpnum2 = self.spd.ps.write().unwrap().renumber(lpnum);
564 debug_assert!(
565 self.stash()
566 .get_pinfo(lpnum2)
567 .lock()
568 .unwrap()
569 .current
570 .is_none()
571 );
572 let old2 = self.get_data(lpnum2);
573 self.stash().set(lpnum2, old2, data);
574 lpnum2
575 }
576}
577
578impl Drop for AccessPagedData {
579 fn drop(&mut self) {
580 if !self.writer {
581 self.stash().end_read(self.time);
582 }
583 }
584}
585
586#[non_exhaustive]
588pub struct Limits {
589 pub af_lim: atom_file::Limits,
591 pub blk_cap: u64,
593 pub page_sizes: usize,
595 pub max_div: usize,
597}
598
599impl Default for Limits {
600 fn default() -> Self {
601 Self {
602 af_lim: atom_file::Limits::default(),
603 blk_cap: 27720,
604 page_sizes: 7,
605 max_div: 12,
606 }
607 }
608}