1use atom_file::{Data, Storage};
33use heap::GHeap;
34pub use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
35use std::sync::{Arc, Mutex, RwLock};
36
37mod block;
38mod blockpagestg;
39mod dividedstg;
40mod heap;
41mod util;
42
43pub use blockpagestg::BlockPageStg;
44
45#[cfg(feature = "pstd")]
46use pstd::collections::BTreeMap;
47
48#[cfg(not(feature = "pstd"))]
49use std::collections::BTreeMap;
50
51#[derive(PartialEq, Eq, PartialOrd, Clone, Copy)]
53pub enum SaveOp {
54 Save,
55 RollBack,
56}
57
58pub trait PageStorage: Send + Sync {
60 fn is_new(&self) -> bool;
62 fn info(&self) -> Box<dyn PageStorageInfo>;
64 fn new_page(&mut self) -> u64;
66 fn drop_page(&mut self, pn: u64);
68 fn set_page(&mut self, pn: u64, data: Data);
70 fn get_page(&self, pn: u64) -> Data;
72 fn size(&self, pn: u64) -> usize;
74 fn save(&mut self);
76 fn rollback(&mut self);
78 fn wait_complete(&self);
80 #[cfg(feature = "verify")]
81 fn get_free(&mut self) -> (crate::HashSet<u64>, u64);
83 #[cfg(feature = "renumber")]
84 fn renumber(&mut self, pn: u64) -> u64;
86 #[cfg(feature = "renumber")]
87 fn load_free_pages(&mut self) -> Option<u64>;
89 #[cfg(feature = "renumber")]
90 fn set_alloc_pn(&mut self, target: u64);
92}
93
94pub trait PageStorageInfo: Send + Sync {
96 fn sizes(&self) -> usize;
98 fn index(&self, size: usize) -> usize;
100 fn size(&self, ix: usize) -> usize;
102 fn max_size_page(&self) -> usize {
104 self.size(self.sizes())
105 }
106 fn half_size_page(&self) -> usize {
108 self.size(self.index(self.max_size_page() / 2 - 50))
109 }
110 fn compress(&self, size: usize, saving: usize) -> bool {
112 self.index(size - saving) < self.index(size)
113 }
114}
115
116type HX = u32; type Heap = GHeap<u64, u64, HX>;
118
119type PageInfoPtr = Arc<Mutex<PageInfo>>;
121
122pub struct PageInfo {
124 pub current: Option<Data>,
126 pub history: BTreeMap<u64, Data>,
129 pub usage: u64,
131 pub hx: HX,
133}
134
135impl PageInfo {
136 fn new() -> PageInfoPtr {
137 Arc::new(Mutex::new(PageInfo {
138 current: None,
139 history: BTreeMap::new(),
140 usage: 0,
141 hx: HX::MAX,
142 }))
143 }
144
145 fn inc_usage(&mut self, lpnum: u64, ah: &mut Heap) {
147 self.usage += 1;
148 if self.hx == HX::MAX {
149 self.hx = ah.insert(lpnum, self.usage);
150 } else {
151 ah.modify(self.hx, self.usage);
152 }
153 }
154
155 fn get_data(&mut self, lpnum: u64, a: &AccessPagedData) -> (Data, usize) {
159 if !a.writer
160 && let Some((_k, v)) = self.history.range(a.time..).next()
161 {
162 return (v.clone(), 0);
163 }
164
165 if let Some(p) = &self.current {
166 return (p.clone(), 0);
167 }
168
169 let ps = a.spd.ps.read().unwrap();
171 let data = ps.get_page(lpnum);
172 self.current = Some(data.clone());
173 let len = data.len();
174 (data, len)
175 }
176
177 fn set_data(&mut self, time: u64, old: Data, data: Data, do_history: bool) -> (usize, usize) {
180 if do_history {
181 self.history.insert(time, old);
182 }
183 let old = if let Some(x) = &self.current {
184 x.len()
185 } else {
186 0
187 };
188 let new = data.len();
189 self.current = if new == 0 { None } else { Some(data) };
190 (old, new)
191 }
192
193 fn trim(&mut self, t: u64, start: u64) -> bool {
196 let first = self.history_start(t);
197 if first >= start {
198 self.history.remove(&t);
200 false
201 } else {
202 true
203 }
204 }
205
206 fn history_start(&self, t: u64) -> u64 {
208 if let Some((k, _)) = self.history.range(..t).next_back() {
209 *k + 1
210 } else {
211 0
212 }
213 }
214}
215
216#[derive(Default)]
218pub struct Stash {
219 time: u64,
221 pub pages: HashMap<u64, PageInfoPtr>,
223 rdrs: BTreeMap<u64, usize>,
225 vers: BTreeMap<u64, HashSet<u64>>,
227 pub total: i64, pub mem_limit: usize,
231 min: Heap,
233 pub read: u64,
235 pub miss: u64,
237}
238
239impl Stash {
240 fn set(&mut self, lpnum: u64, old: Data, data: Data) {
242 let time = self.time;
243 let u = self.vers.entry(time).or_default();
244 let do_history = u.insert(lpnum);
245 let p = self.get_pinfo(lpnum);
246 let diff = p.lock().unwrap().set_data(time, old, data, do_history);
247 self.delta(diff, false, false);
248 }
249
250 fn get_pinfo(&mut self, lpnum: u64) -> PageInfoPtr {
252 let p = self
253 .pages
254 .entry(lpnum)
255 .or_insert_with(PageInfo::new)
256 .clone();
257 p.lock().unwrap().inc_usage(lpnum, &mut self.min);
258 self.read += 1;
259 p
260 }
261
262 fn begin_read(&mut self) -> u64 {
264 let time = self.time;
265 let n = self.rdrs.entry(time).or_insert(0);
266 *n += 1;
267 time
268 }
269
270 fn end_read(&mut self, time: u64) {
272 let n = self.rdrs.get_mut(&time).unwrap();
273 *n -= 1;
274 if *n == 0 {
275 self.rdrs.remove(&time);
276 self.trim(time);
277 }
278 }
279
280 fn end_write(&mut self) -> usize {
283 let result = if let Some(u) = self.vers.get(&self.time) {
284 u.len()
285 } else {
286 0
287 };
288 let t = self.time;
289 self.time = t + 1;
290 self.trim(t);
291 result
292 }
293
294 fn trim(&mut self, time: u64) {
296 let (s, r) = (self.start(time), self.retain(time));
297 if s != r {
298 let mut empty = Vec::<u64>::new();
299 for (t, pl) in self.vers.range_mut(s..r) {
300 pl.retain(|pnum| {
301 let p = self.pages.get(pnum).unwrap();
302 p.lock().unwrap().trim(*t, s)
303 });
304 if pl.is_empty() {
305 empty.push(*t);
306 }
307 }
308 for t in empty {
309 self.vers.remove(&t);
310 }
311 }
312 }
313
314 fn start(&self, time: u64) -> u64 {
316 if let Some((t, _n)) = self.rdrs.range(..time).next_back() {
317 1 + *t
318 } else {
319 0
320 }
321 }
322
323 fn retain(&self, time: u64) -> u64 {
325 if let Some((t, _n)) = self.rdrs.range(time..).next() {
326 *t
327 } else {
328 self.time
329 }
330 }
331
332 fn delta(&mut self, d: (usize, usize), miss: bool, trim: bool) {
334 if miss {
335 self.miss += 1;
336 }
337 self.total += d.1 as i64 - d.0 as i64;
338 if trim {
339 self.trim_cache();
340 }
341 }
342
343 pub fn trim_cache(&mut self) {
345 while self.total > self.mem_limit as i64 && self.min.len() > 0 {
346 let lpnum = self.min.pop();
347 let mut p = self.pages.get(&lpnum).unwrap().lock().unwrap();
348 p.hx = HX::MAX;
349 if let Some(data) = &p.current {
350 self.total -= data.len() as i64;
351 p.current = None;
352 }
353 }
354 }
355
356 pub fn cached(&self) -> usize {
358 self.min.len() as usize
359 }
360}
361
362pub struct SharedPagedData {
364 pub ps: RwLock<Box<dyn PageStorage>>,
366 pub stash: Mutex<Stash>,
368 pub psi: Box<dyn PageStorageInfo>,
370}
371
372impl SharedPagedData {
373 pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
391 let limits = crate::Limits::default();
392 Self::new_from_ps(crate::blockpagestg::BlockPageStg::new(stg, &limits))
393 }
394
395 pub fn new_from_ps(ps: Box<dyn PageStorage>) -> Arc<Self> {
397 let stash = Stash {
399 mem_limit: 10 * 1024 * 1024,
400 ..Default::default()
401 };
402 let psi = ps.info();
403 Arc::new(Self {
404 stash: Mutex::new(stash),
405 ps: RwLock::new(ps),
406 psi,
407
408 })
409 }
410
411 pub fn new_reader(self: &Arc<Self>) -> AccessPagedData
413 {
414 let time = self.stash.lock().unwrap().begin_read();
415 AccessPagedData {
416 writer: false,
417 time,
418 spd: self.clone(),
419 }
420 }
421
422 pub fn new_writer(self: &Arc<Self>) -> AccessPagedData{
424 AccessPagedData {
425 writer: true,
426 time: 0,
427 spd: self.clone(),
428 }
429 }
430
431 pub fn wait_complete(&self) {
433 self.ps.read().unwrap().wait_complete();
434 }
435}
436
437pub struct AccessPagedData {
439 writer: bool,
440 time: u64,
441 pub spd: Arc<SharedPagedData>,
443}
444
445impl AccessPagedData {
446 #[deprecated(note="use SharedPagedData::new_reader instead")]
448 pub fn new_reader(spd: Arc<SharedPagedData>) -> Self {
449 let time = spd.stash.lock().unwrap().begin_read();
450 AccessPagedData {
451 writer: false,
452 time,
453 spd,
454 }
455 }
456
457 #[deprecated(note="use SharedPagedData::new_writer instead")]
459 pub fn new_writer(spd: Arc<SharedPagedData>) -> Self {
460 AccessPagedData {
461 writer: true,
462 time: 0,
463 spd,
464 }
465 }
466
467 pub fn stash(&self) -> std::sync::MutexGuard<'_, Stash> {
469 self.spd.stash.lock().unwrap()
470 }
471
472 pub fn get_data(&self, lpnum: u64) -> Data {
474 let pinfo = self.stash().get_pinfo(lpnum);
476
477 let (data, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
479
480 if loaded > 0 {
481 self.stash().delta((0, loaded), true, true);
482 }
483 data
484 }
485
486 pub fn set_data(&self, lpnum: u64, data: Data) {
488 debug_assert!(self.writer);
489
490 let pinfo = self.stash().get_pinfo(lpnum);
492
493 let (old, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
495
496 {
498 let s = &mut *self.stash();
499 if loaded > 0 {
500 s.delta((0, loaded), true, false);
501 }
502 s.set(lpnum, old, data.clone());
503 s.trim_cache();
504 }
505
506 if !data.is_empty() {
508 self.spd.ps.write().unwrap().set_page(lpnum, data);
509 } else {
510 self.spd.ps.write().unwrap().drop_page(lpnum);
511 }
512 }
513
514 pub fn alloc_page(&self) -> u64 {
516 debug_assert!(self.writer);
517 self.spd.ps.write().unwrap().new_page()
518 }
519
520 pub fn free_page(&self, lpnum: u64) {
522 self.set_data(lpnum, Data::default());
523 }
524
525 pub fn is_new(&self) -> bool {
527 self.writer && self.spd.ps.read().unwrap().is_new()
528 }
529
530 pub fn compress(&self, size: usize, saving: usize) -> bool {
532 debug_assert!(self.writer);
533 self.spd.psi.compress(size, saving)
534 }
535
536 pub fn save(&self, op: SaveOp) -> usize {
538 debug_assert!(self.writer);
539 match op {
540 SaveOp::Save => {
541 self.spd.ps.write().unwrap().save();
542 self.stash().end_write()
543 }
544 SaveOp::RollBack => {
545 self.spd.ps.write().unwrap().rollback();
548 0
549 }
550 }
551 }
552
553 #[cfg(feature = "renumber")]
555 pub fn renumber_page(&self, lpnum: u64) -> u64 {
556 assert!(self.writer);
557 let data = self.get_data(lpnum);
558 self.stash().set(lpnum, data.clone(), Data::default());
559 let lpnum2 = self.spd.ps.write().unwrap().renumber(lpnum);
560 debug_assert!(
561 self.stash()
562 .get_pinfo(lpnum2)
563 .lock()
564 .unwrap()
565 .current
566 .is_none()
567 );
568 let old2 = self.get_data(lpnum2);
569 self.stash().set(lpnum2, old2, data);
570 lpnum2
571 }
572}
573
574impl Drop for AccessPagedData {
575 fn drop(&mut self) {
576 if !self.writer {
577 self.stash().end_read(self.time);
578 }
579 }
580}
581
582#[non_exhaustive]
584pub struct Limits {
585 pub af_lim: atom_file::Limits,
587 pub blk_cap: u64,
589 pub page_sizes: usize,
591 pub max_div: usize,
593}
594
595impl Default for Limits {
596 fn default() -> Self {
597 Self {
598 af_lim: atom_file::Limits::default(),
599 blk_cap: 27720,
600 page_sizes: 7,
601 max_div: 12,
602 }
603 }
604}