1#![deny(missing_docs)]
37
38use atom_file::{Data, Storage};
39use heap::GHeap;
40pub use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
41use std::sync::{Arc, Mutex, RwLock};
42
43mod block;
44mod blockpagestg;
45mod dividedstg;
46mod heap;
47mod util;
48
49pub use blockpagestg::BlockPageStg;
50
51#[cfg(feature = "pstd")]
52use pstd::collections::BTreeMap;
53
54#[cfg(not(feature = "pstd"))]
55use std::collections::BTreeMap;
56
57#[derive(PartialEq, Eq, PartialOrd, Clone, Copy)]
59pub enum SaveOp {
60 Save,
62 RollBack,
64}
65
66pub trait PageStorage: Send + Sync {
68 fn is_new(&self) -> bool;
70 fn info(&self) -> Box<dyn PageStorageInfo>;
72 fn new_page(&mut self) -> u64;
74 fn drop_page(&mut self, pn: u64);
76 fn set_page(&mut self, pn: u64, data: Data);
78 fn get_page(&self, pn: u64) -> Data;
80 fn size(&self, pn: u64) -> usize;
82 fn save(&mut self);
84 fn rollback(&mut self);
86 fn wait_complete(&self);
88 #[cfg(feature = "verify")]
89 fn get_free(&mut self) -> (crate::HashSet<u64>, u64);
91 #[cfg(feature = "renumber")]
92 fn renumber(&mut self, pn: u64) -> u64;
94 #[cfg(feature = "renumber")]
95 fn load_free_pages(&mut self) -> Option<u64>;
97 #[cfg(feature = "renumber")]
98 fn set_alloc_pn(&mut self, target: u64);
100}
101
102pub trait PageStorageInfo: Send + Sync {
104 fn sizes(&self) -> usize;
106 fn index(&self, size: usize) -> usize;
108 fn size(&self, ix: usize) -> usize;
110 fn max_size_page(&self) -> usize {
112 self.size(self.sizes())
113 }
114 fn half_size_page(&self) -> usize {
116 self.size(self.index(self.max_size_page() / 2 - 50))
117 }
118 fn compress(&self, size: usize, saving: usize) -> bool {
120 self.index(size - saving) < self.index(size)
121 }
122}
123
124type HX = u32; type Heap = GHeap<u64, u64, HX>;
126
127type PageInfoPtr = Arc<Mutex<PageInfo>>;
129
130pub struct PageInfo {
132 pub current: Option<Data>,
134 pub history: BTreeMap<u64, Data>,
137 pub usage: u64,
139 pub hx: HX,
141}
142
143impl PageInfo {
144 fn new() -> PageInfoPtr {
145 Arc::new(Mutex::new(PageInfo {
146 current: None,
147 history: BTreeMap::new(),
148 usage: 0,
149 hx: HX::MAX,
150 }))
151 }
152
153 fn inc_usage(&mut self, lpnum: u64, ah: &mut Heap) {
155 self.usage += 1;
156 if self.hx == HX::MAX {
157 self.hx = ah.insert(lpnum, self.usage);
158 } else {
159 ah.modify(self.hx, self.usage);
160 }
161 }
162
163 fn get_data(&mut self, lpnum: u64, a: &AccessPagedData) -> (Data, usize) {
167 if !a.writer
168 && let Some((_k, v)) = self.history.range(a.time..).next()
169 {
170 return (v.clone(), 0);
171 }
172
173 if let Some(p) = &self.current {
174 return (p.clone(), 0);
175 }
176
177 let ps = a.spd.ps.read().unwrap();
179 let data = ps.get_page(lpnum);
180 self.current = Some(data.clone());
181 let len = data.len();
182 (data, len)
183 }
184
185 fn set_data(&mut self, time: u64, old: Data, data: Data, do_history: bool) -> (usize, usize) {
188 if do_history {
189 self.history.insert(time, old);
190 }
191 let old = if let Some(x) = &self.current {
192 x.len()
193 } else {
194 0
195 };
196 let new = data.len();
197 self.current = if new == 0 { None } else { Some(data) };
198 (old, new)
199 }
200
201 fn trim(&mut self, t: u64, start: u64) -> bool {
204 let first = self.history_start(t);
205 if first >= start {
206 self.history.remove(&t);
208 false
209 } else {
210 true
211 }
212 }
213
214 fn history_start(&self, t: u64) -> u64 {
216 if let Some((k, _)) = self.history.range(..t).next_back() {
217 *k + 1
218 } else {
219 0
220 }
221 }
222}
223
224#[derive(Default)]
226pub struct Stash {
227 time: u64,
229 pub pages: HashMap<u64, PageInfoPtr>,
231 rdrs: BTreeMap<u64, usize>,
233 vers: BTreeMap<u64, HashSet<u64>>,
235 pub total: i64, pub mem_limit: usize,
239 min: Heap,
241 pub read: u64,
243 pub miss: u64,
245}
246
247impl Stash {
248 fn set(&mut self, lpnum: u64, old: Data, data: Data) {
250 let time = self.time;
251 let u = self.vers.entry(time).or_default();
252 let do_history = u.insert(lpnum);
253 let p = self.get_pinfo(lpnum);
254 let diff = p.lock().unwrap().set_data(time, old, data, do_history);
255 self.delta(diff, false, false);
256 }
257
258 fn get_pinfo(&mut self, lpnum: u64) -> PageInfoPtr {
260 let p = self
261 .pages
262 .entry(lpnum)
263 .or_insert_with(PageInfo::new)
264 .clone();
265 p.lock().unwrap().inc_usage(lpnum, &mut self.min);
266 self.read += 1;
267 p
268 }
269
270 fn begin_read(&mut self) -> u64 {
272 let time = self.time;
273 let n = self.rdrs.entry(time).or_insert(0);
274 *n += 1;
275 time
276 }
277
278 fn end_read(&mut self, time: u64) {
280 let n = self.rdrs.get_mut(&time).unwrap();
281 *n -= 1;
282 if *n == 0 {
283 self.rdrs.remove(&time);
284 self.trim(time);
285 }
286 }
287
288 fn end_write(&mut self) -> usize {
291 let result = if let Some(u) = self.vers.get(&self.time) {
292 u.len()
293 } else {
294 0
295 };
296 let t = self.time;
297 self.time = t + 1;
298 self.trim(t);
299 result
300 }
301
302 fn trim(&mut self, time: u64) {
304 let (s, r) = (self.start(time), self.retain(time));
305 if s != r {
306 let mut empty = Vec::<u64>::new();
307 for (t, pl) in self.vers.range_mut(s..r) {
308 pl.retain(|pnum| {
309 let p = self.pages.get(pnum).unwrap();
310 p.lock().unwrap().trim(*t, s)
311 });
312 if pl.is_empty() {
313 empty.push(*t);
314 }
315 }
316 for t in empty {
317 self.vers.remove(&t);
318 }
319 }
320 }
321
322 fn start(&self, time: u64) -> u64 {
324 if let Some((t, _n)) = self.rdrs.range(..time).next_back() {
325 1 + *t
326 } else {
327 0
328 }
329 }
330
331 fn retain(&self, time: u64) -> u64 {
333 if let Some((t, _n)) = self.rdrs.range(time..).next() {
334 *t
335 } else {
336 self.time
337 }
338 }
339
340 fn delta(&mut self, d: (usize, usize), miss: bool, trim: bool) {
342 if miss {
343 self.miss += 1;
344 }
345 self.total += d.1 as i64 - d.0 as i64;
346 if trim {
347 self.trim_cache();
348 }
349 }
350
351 pub fn trim_cache(&mut self) {
353 while self.total > self.mem_limit as i64 && self.min.len() > 0 {
354 let lpnum = self.min.pop();
355 let mut p = self.pages.get(&lpnum).unwrap().lock().unwrap();
356 p.hx = HX::MAX;
357 if let Some(data) = &p.current {
358 self.total -= data.len() as i64;
359 p.current = None;
360 }
361 }
362 }
363
364 pub fn cached(&self) -> usize {
366 self.min.len() as usize
367 }
368}
369
370pub struct SharedPagedData {
372 pub ps: RwLock<Box<dyn PageStorage>>,
374 pub stash: Mutex<Stash>,
376 pub psi: Box<dyn PageStorageInfo>,
378}
379
380impl SharedPagedData {
381 pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
399 let limits = crate::Limits::default();
400 Self::new_from_ps(crate::blockpagestg::BlockPageStg::new(stg, &limits))
401 }
402
403 pub fn new_from_ps(ps: Box<dyn PageStorage>) -> Arc<Self> {
405 let stash = Stash {
407 mem_limit: 10 * 1024 * 1024,
408 ..Default::default()
409 };
410 let psi = ps.info();
411 Arc::new(Self {
412 stash: Mutex::new(stash),
413 ps: RwLock::new(ps),
414 psi,
415 })
416 }
417
418 pub fn new_reader(self: &Arc<Self>) -> AccessPagedData {
420 let time = self.stash.lock().unwrap().begin_read();
421 AccessPagedData {
422 writer: false,
423 time,
424 spd: self.clone(),
425 }
426 }
427
428 pub fn new_writer(self: &Arc<Self>) -> AccessPagedData {
430 AccessPagedData {
431 writer: true,
432 time: 0,
433 spd: self.clone(),
434 }
435 }
436
437 pub fn wait_complete(&self) {
439 self.ps.read().unwrap().wait_complete();
440 }
441}
442
443pub struct AccessPagedData {
445 writer: bool,
446 time: u64,
447 pub spd: Arc<SharedPagedData>,
449}
450
451impl AccessPagedData {
452 #[deprecated(note = "use SharedPagedData::new_reader instead")]
454 pub fn new_reader(spd: Arc<SharedPagedData>) -> Self {
455 let time = spd.stash.lock().unwrap().begin_read();
456 AccessPagedData {
457 writer: false,
458 time,
459 spd,
460 }
461 }
462
463 #[deprecated(note = "use SharedPagedData::new_writer instead")]
465 pub fn new_writer(spd: Arc<SharedPagedData>) -> Self {
466 AccessPagedData {
467 writer: true,
468 time: 0,
469 spd,
470 }
471 }
472
473 pub fn stash(&self) -> std::sync::MutexGuard<'_, Stash> {
475 self.spd.stash.lock().unwrap()
476 }
477
478 pub fn get_data(&self, lpnum: u64) -> Data {
480 let pinfo = self.stash().get_pinfo(lpnum);
482
483 let (data, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
485
486 if loaded > 0 {
487 self.stash().delta((0, loaded), true, true);
488 }
489 data
490 }
491
492 pub fn set_data(&self, lpnum: u64, data: Data) {
494 debug_assert!(self.writer);
495
496 let pinfo = self.stash().get_pinfo(lpnum);
498
499 let (old, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
501
502 {
504 let s = &mut *self.stash();
505 if loaded > 0 {
506 s.delta((0, loaded), true, false);
507 }
508 s.set(lpnum, old, data.clone());
509 s.trim_cache();
510 }
511
512 if !data.is_empty() {
514 self.spd.ps.write().unwrap().set_page(lpnum, data);
515 } else {
516 self.spd.ps.write().unwrap().drop_page(lpnum);
517 }
518 }
519
520 pub fn alloc_page(&self) -> u64 {
522 debug_assert!(self.writer);
523 self.spd.ps.write().unwrap().new_page()
524 }
525
526 pub fn free_page(&self, lpnum: u64) {
528 self.set_data(lpnum, Data::default());
529 }
530
531 pub fn is_new(&self) -> bool {
533 self.writer && self.spd.ps.read().unwrap().is_new()
534 }
535
536 pub fn compress(&self, size: usize, saving: usize) -> bool {
538 debug_assert!(self.writer);
539 self.spd.psi.compress(size, saving)
540 }
541
542 pub fn save(&self, op: SaveOp) -> usize {
544 debug_assert!(self.writer);
545 match op {
546 SaveOp::Save => {
547 self.spd.ps.write().unwrap().save();
548 self.stash().end_write()
549 }
550 SaveOp::RollBack => {
551 self.spd.ps.write().unwrap().rollback();
554 0
555 }
556 }
557 }
558
559 #[cfg(feature = "renumber")]
561 pub fn renumber_page(&self, lpnum: u64) -> u64 {
562 assert!(self.writer);
563 let data = self.get_data(lpnum);
564 self.stash().set(lpnum, data.clone(), Data::default());
565 let lpnum2 = self.spd.ps.write().unwrap().renumber(lpnum);
566 debug_assert!(
567 self.stash()
568 .get_pinfo(lpnum2)
569 .lock()
570 .unwrap()
571 .current
572 .is_none()
573 );
574 let old2 = self.get_data(lpnum2);
575 self.stash().set(lpnum2, old2, data);
576 lpnum2
577 }
578}
579
580impl Drop for AccessPagedData {
581 fn drop(&mut self) {
582 if !self.writer {
583 self.stash().end_read(self.time);
584 }
585 }
586}
587
588#[non_exhaustive]
594pub struct Limits {
595 pub af_lim: atom_file::Limits,
597 pub blk_cap: u64,
599 pub page_sizes: usize,
601 pub max_div: usize,
603}
604
605impl Default for Limits {
606 fn default() -> Self {
607 Self {
608 af_lim: atom_file::Limits::default(),
609 blk_cap: 27720,
610 page_sizes: 7,
611 max_div: 12,
612 }
613 }
614}