1#![deny(missing_docs)]
37
38use atom_file::{Data, Storage};
39use heap::GHeap;
40pub use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
41use std::sync::{Arc, Mutex, RwLock};
42
43mod block;
44mod blockpagestg;
45mod dividedstg;
46mod heap;
47mod util;
48
49pub use blockpagestg::BlockPageStg;
50
51#[cfg(feature = "pstd")]
52use pstd::collections::BTreeMap;
53
54#[cfg(not(feature = "pstd"))]
55use std::collections::BTreeMap;
56
57#[derive(PartialEq, Eq, PartialOrd, Clone, Copy)]
59pub enum SaveOp {
60 Save,
62 RollBack,
64}
65
66pub trait PageStorage: Send + Sync {
68 fn is_new(&self) -> bool;
70 fn info(&self) -> Box<dyn PageStorageInfo>;
72 fn new_page(&mut self) -> u64;
74 fn drop_page(&mut self, pn: u64);
76 fn set_page(&mut self, pn: u64, data: Data);
78 fn get_page(&self, pn: u64) -> Data;
80 fn size(&self, pn: u64) -> usize;
82 fn save(&mut self);
84 fn rollback(&mut self);
86 fn wait_complete(&self);
88 #[cfg(feature = "verify")]
89 fn get_free(&mut self) -> (crate::HashSet<u64>, u64);
91 #[cfg(feature = "renumber")]
92 fn renumber(&mut self, pn: u64) -> u64;
94 #[cfg(feature = "renumber")]
95 fn load_free_pages(&mut self) -> Option<u64>;
97 #[cfg(feature = "renumber")]
98 fn set_alloc_pn(&mut self, target: u64);
100}
101
102pub trait PageStorageInfo: Send + Sync {
104 fn sizes(&self) -> usize;
106 fn index(&self, size: usize) -> usize;
108 fn size(&self, ix: usize) -> usize;
110 fn max_size_page(&self) -> usize {
112 self.size(self.sizes())
113 }
114 fn half_size_page(&self) -> usize {
116 self.size(self.index(self.max_size_page() / 2 - 50))
117 }
118 fn compress(&self, size: usize, saving: usize) -> bool {
120 self.index(size - saving) < self.index(size)
121 }
122}
123
124type HX = u32; type Heap = GHeap<u64, u64, HX>;
126
127type PageInfoPtr = Arc<Mutex<PageInfo>>;
129
130pub struct PageInfo {
132 pub current: Option<Data>,
134 pub history: BTreeMap<u64, Data>,
137 pub usage: u64,
139 pub hx: HX,
141}
142
143impl PageInfo {
144 fn new() -> PageInfoPtr {
145 Arc::new(Mutex::new(PageInfo {
146 current: None,
147 history: BTreeMap::new(),
148 usage: 0,
149 hx: HX::MAX,
150 }))
151 }
152
153 fn inc_usage(&mut self, lpnum: u64, ah: &mut Heap) {
155 self.usage += 1;
156 if self.hx == HX::MAX {
157 self.hx = ah.insert(lpnum, self.usage);
158 } else {
159 ah.modify(self.hx, self.usage);
160 }
161 }
162
163 fn get_data(&mut self, lpnum: u64, a: &AccessPagedData) -> (Data, usize) {
167 if !a.writer
168 && let Some((_k, v)) = self.history.range(a.time..).next()
169 {
170 return (v.clone(), 0);
171 }
172
173 if let Some(p) = &self.current {
174 return (p.clone(), 0);
175 }
176
177 let ps = a.spd.ps.read().unwrap();
179 let data = ps.get_page(lpnum);
180 self.current = Some(data.clone());
181 let len = data.len();
182 (data, len)
183 }
184
185 fn set_data(&mut self, time: u64, old: Data, data: Data, do_history: bool) -> (usize, usize) {
188 if do_history {
189 self.history.insert(time, old);
190 }
191 let old = if let Some(x) = &self.current {
192 x.len()
193 } else {
194 0
195 };
196 let new = data.len();
197 self.current = if new == 0 { None } else { Some(data) };
198 (old, new)
199 }
200
201 fn trim(&mut self, t: u64, start: u64) -> bool {
204 let first = self.history_start(t);
205 if first >= start {
206 self.history.remove(&t);
208 false
209 } else {
210 true
211 }
212 }
213
214 fn history_start(&self, t: u64) -> u64 {
216 if let Some((k, _)) = self.history.range(..t).next_back() {
217 *k + 1
218 } else {
219 0
220 }
221 }
222}
223
224#[derive(Default)]
226pub struct Stash {
227 time: u64,
229 pub pages: HashMap<u64, PageInfoPtr>,
231 rdrs: BTreeMap<u64, usize>,
233 vers: BTreeMap<u64, HashSet<u64>>,
235 pub total: i64, pub mem_limit: usize,
239 min: Heap,
241 pub read: u64,
243 pub miss: u64,
245}
246
247impl Stash {
248 fn set(&mut self, lpnum: u64, old: Data, data: Data) {
250 let time = self.time;
251 let u = self.vers.entry(time).or_default();
252 let do_history = u.insert(lpnum);
253 let p = self.get_pinfo(lpnum);
254 let diff = p.lock().unwrap().set_data(time, old, data, do_history);
255 self.delta(diff, false, false);
256 }
257
258 fn get_pinfo(&mut self, lpnum: u64) -> PageInfoPtr {
260 let p = self
261 .pages
262 .entry(lpnum)
263 .or_insert_with(PageInfo::new)
264 .clone();
265 p.lock().unwrap().inc_usage(lpnum, &mut self.min);
266 self.read += 1;
267 p
268 }
269
270 fn begin_read(&mut self) -> u64 {
272 let time = self.time;
273 let n = self.rdrs.entry(time).or_insert(0);
274 *n += 1;
275 time
276 }
277
278 fn end_read(&mut self, time: u64) {
280 let n = self.rdrs.get_mut(&time).unwrap();
281 *n -= 1;
282 if *n == 0 {
283 self.rdrs.remove(&time);
284 self.trim(time);
285 }
286 }
287
288 fn end_write(&mut self) -> usize {
291 let result = if let Some(u) = self.vers.get(&self.time) {
292 u.len()
293 } else {
294 0
295 };
296 let t = self.time;
297 self.time = t + 1;
298 self.trim(t);
299 result
300 }
301
302 fn trim(&mut self, time: u64) {
304 let (s, r) = (self.start(time), self.retain(time));
305 if s != r {
306 let mut empty = Vec::<u64>::new();
307 for (t, pl) in self.vers.range_mut(s..r) {
308 pl.retain(|pnum| {
309 let p = self.pages.get(pnum).unwrap();
310 p.lock().unwrap().trim(*t, s)
311 });
312 if pl.is_empty() {
313 empty.push(*t);
314 }
315 }
316 for t in empty {
317 self.vers.remove(&t);
318 }
319 }
320 }
321
322 fn start(&self, time: u64) -> u64 {
324 if let Some((t, _n)) = self.rdrs.range(..time).next_back() {
325 1 + *t
326 } else {
327 0
328 }
329 }
330
331 fn retain(&self, time: u64) -> u64 {
333 if let Some((t, _n)) = self.rdrs.range(time..).next() {
334 *t
335 } else {
336 self.time
337 }
338 }
339
340 fn delta(&mut self, d: (usize, usize), miss: bool, trim: bool) {
342 if miss {
343 self.miss += 1;
344 }
345 self.total += d.1 as i64 - d.0 as i64;
346 if trim {
347 self.trim_cache();
348 }
349 }
350
351 pub fn trim_cache(&mut self) {
353 while self.total > self.mem_limit as i64 && self.min.len() > 0 {
354 let lpnum = self.min.pop();
355 let mut p = self.pages.get(&lpnum).unwrap().lock().unwrap();
356 p.hx = HX::MAX;
357 if let Some(data) = &p.current {
358 self.total -= data.len() as i64;
359 p.current = None;
360 }
361 }
362 }
363
364 pub fn cached(&self) -> usize {
366 self.min.len() as usize
367 }
368}
369
370pub struct SharedPagedData {
372 pub ps: RwLock<Box<dyn PageStorage>>,
374 pub stash: Mutex<Stash>,
376 pub psi: Box<dyn PageStorageInfo>,
378}
379
380impl SharedPagedData {
381 pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
399 let limits = crate::Limits::default();
400 Self::new_from_ps(crate::blockpagestg::BlockPageStg::new(stg, &limits))
401 }
402
403 pub fn new_from_ps(ps: Box<dyn PageStorage>) -> Arc<Self> {
405 let stash = Stash {
407 mem_limit: 10 * 1024 * 1024,
408 ..Default::default()
409 };
410 let psi = ps.info();
411 Arc::new(Self {
412 stash: Mutex::new(stash),
413 ps: RwLock::new(ps),
414 psi,
415
416 })
417 }
418
419 pub fn new_reader(self: &Arc<Self>) -> AccessPagedData
421 {
422 let time = self.stash.lock().unwrap().begin_read();
423 AccessPagedData {
424 writer: false,
425 time,
426 spd: self.clone(),
427 }
428 }
429
430 pub fn new_writer(self: &Arc<Self>) -> AccessPagedData{
432 AccessPagedData {
433 writer: true,
434 time: 0,
435 spd: self.clone(),
436 }
437 }
438
439 pub fn wait_complete(&self) {
441 self.ps.read().unwrap().wait_complete();
442 }
443}
444
445pub struct AccessPagedData {
447 writer: bool,
448 time: u64,
449 pub spd: Arc<SharedPagedData>,
451}
452
453impl AccessPagedData {
454 #[deprecated(note="use SharedPagedData::new_reader instead")]
456 pub fn new_reader(spd: Arc<SharedPagedData>) -> Self {
457 let time = spd.stash.lock().unwrap().begin_read();
458 AccessPagedData {
459 writer: false,
460 time,
461 spd,
462 }
463 }
464
465 #[deprecated(note="use SharedPagedData::new_writer instead")]
467 pub fn new_writer(spd: Arc<SharedPagedData>) -> Self {
468 AccessPagedData {
469 writer: true,
470 time: 0,
471 spd,
472 }
473 }
474
475 pub fn stash(&self) -> std::sync::MutexGuard<'_, Stash> {
477 self.spd.stash.lock().unwrap()
478 }
479
480 pub fn get_data(&self, lpnum: u64) -> Data {
482 let pinfo = self.stash().get_pinfo(lpnum);
484
485 let (data, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
487
488 if loaded > 0 {
489 self.stash().delta((0, loaded), true, true);
490 }
491 data
492 }
493
494 pub fn set_data(&self, lpnum: u64, data: Data) {
496 debug_assert!(self.writer);
497
498 let pinfo = self.stash().get_pinfo(lpnum);
500
501 let (old, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
503
504 {
506 let s = &mut *self.stash();
507 if loaded > 0 {
508 s.delta((0, loaded), true, false);
509 }
510 s.set(lpnum, old, data.clone());
511 s.trim_cache();
512 }
513
514 if !data.is_empty() {
516 self.spd.ps.write().unwrap().set_page(lpnum, data);
517 } else {
518 self.spd.ps.write().unwrap().drop_page(lpnum);
519 }
520 }
521
522 pub fn alloc_page(&self) -> u64 {
524 debug_assert!(self.writer);
525 self.spd.ps.write().unwrap().new_page()
526 }
527
528 pub fn free_page(&self, lpnum: u64) {
530 self.set_data(lpnum, Data::default());
531 }
532
533 pub fn is_new(&self) -> bool {
535 self.writer && self.spd.ps.read().unwrap().is_new()
536 }
537
538 pub fn compress(&self, size: usize, saving: usize) -> bool {
540 debug_assert!(self.writer);
541 self.spd.psi.compress(size, saving)
542 }
543
544 pub fn save(&self, op: SaveOp) -> usize {
546 debug_assert!(self.writer);
547 match op {
548 SaveOp::Save => {
549 self.spd.ps.write().unwrap().save();
550 self.stash().end_write()
551 }
552 SaveOp::RollBack => {
553 self.spd.ps.write().unwrap().rollback();
556 0
557 }
558 }
559 }
560
561 #[cfg(feature = "renumber")]
563 pub fn renumber_page(&self, lpnum: u64) -> u64 {
564 assert!(self.writer);
565 let data = self.get_data(lpnum);
566 self.stash().set(lpnum, data.clone(), Data::default());
567 let lpnum2 = self.spd.ps.write().unwrap().renumber(lpnum);
568 debug_assert!(
569 self.stash()
570 .get_pinfo(lpnum2)
571 .lock()
572 .unwrap()
573 .current
574 .is_none()
575 );
576 let old2 = self.get_data(lpnum2);
577 self.stash().set(lpnum2, old2, data);
578 lpnum2
579 }
580}
581
582impl Drop for AccessPagedData {
583 fn drop(&mut self) {
584 if !self.writer {
585 self.stash().end_read(self.time);
586 }
587 }
588}
589
590#[non_exhaustive]
596pub struct Limits {
597 pub af_lim: atom_file::Limits,
599 pub blk_cap: u64,
601 pub page_sizes: usize,
603 pub max_div: usize,
605}
606
607impl Default for Limits {
608 fn default() -> Self {
609 Self {
610 af_lim: atom_file::Limits::default(),
611 blk_cap: 27720,
612 page_sizes: 7,
613 max_div: 12,
614 }
615 }
616}