Skip to main content

page_store/
lib.rs

1//! [`SharedPagedData`] manages numbered [`Data`] pages, which can be shared by multiple processes.
2//!
3//! [`PageStorageInfo`] has information about the page sizes available. The default maximum page size is 4612.
4//!
5//!# Test example
6//!
7//! ```
8//!     use page_store::{SharedPagedData,SaveOp};
9//!     use atom_file::{MemFile,Data};
10//!     use std::sync::Arc;
11//!
12//!     let spd = SharedPagedData::new(MemFile::new());
13//!     println!( "Number of page sizes={}", spd.psi.sizes() );
14//!     println!( "Max page size={}", spd.psi.max_size_page() );
15//!     let w = spd.new_writer();
16//!     let pnum : u64 = w.alloc_page();
17//!     w.set_data( pnum, Arc::new( vec![1,2,3,4] ) );
18//!     w.save( SaveOp::Save );
19//!     let r = spd.new_reader();
20//!     let mut d : Data = w.get_data( pnum );
21//!     assert!( *d == vec![1,2,3,4] );
22//!     let md = Arc::make_mut(&mut d);
23//!     md[0] = 2;
24//!     w.set_data( pnum, d );
25//!     w.save( SaveOp::Save );
26//!     let d : Data = w.get_data( pnum );
27//!     assert!( *d == vec![2,2,3,4] );
28//!     let d : Data = r.get_data( pnum );
29//!     assert!( *d == vec![1,2,3,4] ); // Reader still sees "old" data.
30//! ```
31
32use atom_file::{Data, Storage};
33use heap::GHeap;
34pub use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
35use std::sync::{Arc, Mutex, RwLock};
36
37mod block;
38mod blockpagestg;
39mod dividedstg;
40mod heap;
41mod util;
42
43pub use blockpagestg::BlockPageStg;
44
45#[cfg(feature = "pstd")]
46use pstd::collections::BTreeMap;
47
48#[cfg(not(feature = "pstd"))]
49use std::collections::BTreeMap;
50
51/// Save or Rollback.
52#[derive(PartialEq, Eq, PartialOrd, Clone, Copy)]
53pub enum SaveOp {
54    Save,
55    RollBack,
56}
57
58/// Interface for page storage.
59pub trait PageStorage: Send + Sync {
60    /// Is the underlying storage new?
61    fn is_new(&self) -> bool;
62    /// Information about page sizes.
63    fn info(&self) -> Box<dyn PageStorageInfo>;
64    /// Make a new page, result is page number.
65    fn new_page(&mut self) -> u64;
66    /// Drop page number.
67    fn drop_page(&mut self, pn: u64);
68    /// Set contents of page.
69    fn set_page(&mut self, pn: u64, data: Data);
70    /// Get contents of page.
71    fn get_page(&self, pn: u64) -> Data;
72    /// Get page size (for repacking).
73    fn size(&self, pn: u64) -> usize;
74    /// Save pages to underlying storage.
75    fn save(&mut self);
76    /// Undo changes since last save ( but set_page/renumber cannot be undone, only new_page and drop_page can be undone ).
77    fn rollback(&mut self);
78    /// Wait until save is complete.
79    fn wait_complete(&self);
80    #[cfg(feature = "verify")]
81    /// Get set of free pages and number of pages ever allocated ( for VERIFY builtin function ).
82    fn get_free(&mut self) -> (crate::HashSet<u64>, u64);
83    #[cfg(feature = "renumber")]
84    /// Renumber page.
85    fn renumber(&mut self, pn: u64) -> u64;
86    #[cfg(feature = "renumber")]
87    /// Load free pages in preparation for page renumbering. Returns number of used pages or None if there are no free pages.
88    fn load_free_pages(&mut self) -> Option<u64>;
89    #[cfg(feature = "renumber")]
90    /// Final part of page renumber operation.
91    fn set_alloc_pn(&mut self, target: u64);
92}
93
94/// Information about page sizes.
95pub trait PageStorageInfo: Send + Sync {
96    /// Number of different page sizes.
97    fn sizes(&self) -> usize;
98    /// Size index for given page size.
99    fn index(&self, size: usize) -> usize;
100    /// Page size for ix ( 1-based ix must be <= sizes() ).
101    fn size(&self, ix: usize) -> usize;
102    /// Maximum size page.
103    fn max_size_page(&self) -> usize {
104        self.size(self.sizes())
105    }
106    /// Half size page.
107    fn half_size_page(&self) -> usize {
108        self.size(self.index(self.max_size_page() / 2 - 50))
109    }
110    /// Is it worth compressing a page of given size by saving.
111    fn compress(&self, size: usize, saving: usize) -> bool {
112        self.index(size - saving) < self.index(size)
113    }
114}
115
116type HX = u32; // Typical 8M cache will have 1K x 8KB pages, so 10 bits is typical, 32 should be plenty.
117type Heap = GHeap<u64, u64, HX>;
118
119/// ```Arc<Mutex<PageInfo>>```
120type PageInfoPtr = Arc<Mutex<PageInfo>>;
121
122/// Information for a page, including historic data.
123pub struct PageInfo {
124    /// Current data for the page( None implies it is stored in underlying file ).
125    pub current: Option<Data>,
126    /// Historic data for the page. Has data for page at specified time.
127    /// A copy is made prior to an update, so get looks forward from access time.
128    pub history: BTreeMap<u64, Data>,
129    /// How many times has the page been used.
130    pub usage: u64,
131    /// Heap index.
132    pub hx: HX,
133}
134
135impl PageInfo {
136    fn new() -> PageInfoPtr {
137        Arc::new(Mutex::new(PageInfo {
138            current: None,
139            history: BTreeMap::new(),
140            usage: 0,
141            hx: HX::MAX,
142        }))
143    }
144
145    /// Increase usage.
146    fn inc_usage(&mut self, lpnum: u64, ah: &mut Heap) {
147        self.usage += 1;
148        if self.hx == HX::MAX {
149            self.hx = ah.insert(lpnum, self.usage);
150        } else {
151            ah.modify(self.hx, self.usage);
152        }
153    }
154
155    /// Get the Data for the page, checking history if not a writer.
156    /// Reads Data from file if necessary.
157    /// Result is Data and size of loaded data ( cache delta ).
158    fn get_data(&mut self, lpnum: u64, a: &AccessPagedData) -> (Data, usize) {
159        if !a.writer
160            && let Some((_k, v)) = self.history.range(a.time..).next()
161        {
162            return (v.clone(), 0);
163        }
164
165        if let Some(p) = &self.current {
166            return (p.clone(), 0);
167        }
168
169        // Get data from page storage.
170        let ps = a.spd.ps.read().unwrap();
171        let data = ps.get_page(lpnum);
172        self.current = Some(data.clone());
173        let len = data.len();
174        (data, len)
175    }
176
177    /// Set the page data, updating the history using the specified time and old data.
178    /// Result is delta of length (old size, new size)
179    fn set_data(&mut self, time: u64, old: Data, data: Data, do_history: bool) -> (usize, usize) {
180        if do_history {
181            self.history.insert(time, old);
182        }
183        let old = if let Some(x) = &self.current {
184            x.len()
185        } else {
186            0
187        };
188        let new = data.len();
189        self.current = if new == 0 { None } else { Some(data) };
190        (old, new)
191    }
192
193    /// Trim entry for time t that no longer need to be retained, returning whether entry was retained.
194    /// start is start of range for which no readers exist.
195    fn trim(&mut self, t: u64, start: u64) -> bool {
196        let first = self.history_start(t);
197        if first >= start {
198            // There is no reader that can read copy for time t, so copy can be removed.
199            self.history.remove(&t);
200            false
201        } else {
202            true
203        }
204    }
205
206    /// Returns the earliest time that would return the page for the specified time.
207    fn history_start(&self, t: u64) -> u64 {
208        if let Some((k, _)) = self.history.range(..t).next_back() {
209            *k + 1
210        } else {
211            0
212        }
213    }
214}
215
216/// Central store of data.
217#[derive(Default)]
218pub struct Stash {
219    /// Write time - number of writes.
220    time: u64,
221    /// Page number -> page info.
222    pub pages: HashMap<u64, PageInfoPtr>,
223    /// Time -> reader count. Number of readers for given time.
224    rdrs: BTreeMap<u64, usize>,
225    /// Time -> set of page numbers. Page copies held for given time.
226    vers: BTreeMap<u64, HashSet<u64>>,
227    /// Total size of current pages.
228    pub total: i64, // Use i64 to avoid problems with overflow.
229    /// trim_cache reduces total to mem_limit (or below).
230    pub mem_limit: usize,
231    /// Tracks loaded page with smallest usage.
232    min: Heap,
233    /// Total number of page accesses.
234    pub read: u64,
235    /// Total number of misses ( data was not already loaded ).
236    pub miss: u64,
237}
238
239impl Stash {
240    /// Set the value of the specified page for the current time.
241    fn set(&mut self, lpnum: u64, old: Data, data: Data) {
242        let time = self.time;
243        let u = self.vers.entry(time).or_default();
244        let do_history = u.insert(lpnum);
245        let p = self.get_pinfo(lpnum);
246        let diff = p.lock().unwrap().set_data(time, old, data, do_history);
247        self.delta(diff, false, false);
248    }
249
250    /// Get the PageInfoPtr for the specified page and note the page as used.
251    fn get_pinfo(&mut self, lpnum: u64) -> PageInfoPtr {
252        let p = self
253            .pages
254            .entry(lpnum)
255            .or_insert_with(PageInfo::new)
256            .clone();
257        p.lock().unwrap().inc_usage(lpnum, &mut self.min);
258        self.read += 1;
259        p
260    }
261
262    /// Register that there is a client reading. The result is the current time.
263    fn begin_read(&mut self) -> u64 {
264        let time = self.time;
265        let n = self.rdrs.entry(time).or_insert(0);
266        *n += 1;
267        time
268    }
269
270    /// Register that the read at the specified time has ended. Stashed pages may be freed.
271    fn end_read(&mut self, time: u64) {
272        let n = self.rdrs.get_mut(&time).unwrap();
273        *n -= 1;
274        if *n == 0 {
275            self.rdrs.remove(&time);
276            self.trim(time);
277        }
278    }
279
280    /// Register that an update operation has completed. Time is incremented.
281    /// Stashed pages may be freed. Returns number of pages updated.
282    fn end_write(&mut self) -> usize {
283        let result = if let Some(u) = self.vers.get(&self.time) {
284            u.len()
285        } else {
286            0
287        };
288        let t = self.time;
289        self.time = t + 1;
290        self.trim(t);
291        result
292    }
293
294    /// Trim historic data that is no longer required.
295    fn trim(&mut self, time: u64) {
296        let (s, r) = (self.start(time), self.retain(time));
297        if s != r {
298            let mut empty = Vec::<u64>::new();
299            for (t, pl) in self.vers.range_mut(s..r) {
300                pl.retain(|pnum| {
301                    let p = self.pages.get(pnum).unwrap();
302                    p.lock().unwrap().trim(*t, s)
303                });
304                if pl.is_empty() {
305                    empty.push(*t);
306                }
307            }
308            for t in empty {
309                self.vers.remove(&t);
310            }
311        }
312    }
313
314    /// Calculate the start of the range of times for which there are no readers.
315    fn start(&self, time: u64) -> u64 {
316        if let Some((t, _n)) = self.rdrs.range(..time).next_back() {
317            1 + *t
318        } else {
319            0
320        }
321    }
322
323    /// Calculate the end of the range of times for which there are no readers.
324    fn retain(&self, time: u64) -> u64 {
325        if let Some((t, _n)) = self.rdrs.range(time..).next() {
326            *t
327        } else {
328            self.time
329        }
330    }
331
332    /// Adjust total.
333    fn delta(&mut self, d: (usize, usize), miss: bool, trim: bool) {
334        if miss {
335            self.miss += 1;
336        }
337        self.total += d.1 as i64 - d.0 as i64;
338        if trim {
339            self.trim_cache();
340        }
341    }
342
343    /// Trim cached data to configured limit.
344    pub fn trim_cache(&mut self) {
345        while self.total > self.mem_limit as i64 && self.min.len() > 0 {
346            let lpnum = self.min.pop();
347            let mut p = self.pages.get(&lpnum).unwrap().lock().unwrap();
348            p.hx = HX::MAX;
349            if let Some(data) = &p.current {
350                self.total -= data.len() as i64;
351                p.current = None;
352            }
353        }
354    }
355
356    /// Return the number of pages currently cached.
357    pub fn cached(&self) -> usize {
358        self.min.len() as usize
359    }
360}
361
362/// Allows pages to be shared to allow concurrent readers.
363pub struct SharedPagedData {
364    /// Permanent storage of pages.
365    pub ps: RwLock<Box<dyn PageStorage>>,
366    /// Stash of pages.
367    pub stash: Mutex<Stash>,
368    /// Info on page sizes.
369    pub psi: Box<dyn PageStorageInfo>,
370}
371
372impl SharedPagedData {
373    /*
374    #[cfg(feature = "compact")]
375    /// Construct default SharedPageData ( default depends on compact feature ).
376    pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
377        const EP_SIZE: usize = 1024; // Size of an extension page.
378        const EP_MAX: usize = 16; // Maximum number of extension pages.
379        const SP_SIZE: usize = (EP_MAX + 1) * 8; // =136. Starter page size.
380
381        Self::new_from_ps(Box::new(crate::compact::CompactFile::new(
382            stg, SP_SIZE, EP_SIZE,
383        )))
384    }
385
386    #[cfg(not(feature = "compact"))]
387    */
388
389    /// Construct default SharedPageData ( default depends on compact feature ).
390    pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
391        let limits = crate::Limits::default();
392        Self::new_from_ps(crate::blockpagestg::BlockPageStg::new(stg, &limits))
393    }
394
395    /// Construct SharedPageData based on specified PageStorage ( e.g. BlockPageStg )
396    pub fn new_from_ps(ps: Box<dyn PageStorage>) -> Arc<Self> {
397        // Set a default stash memory limit of 10 MB.
398        let stash = Stash {
399            mem_limit: 10 * 1024 * 1024,
400            ..Default::default()
401        };
402        let psi = ps.info();
403        Arc::new(Self {
404            stash: Mutex::new(stash),
405            ps: RwLock::new(ps),
406            psi,
407     
408        })
409    }
410
411    /// Get read access to a virtual read-only copy of the pages.
412    pub fn new_reader(self: &Arc<Self>) -> AccessPagedData
413    {
414        let time = self.stash.lock().unwrap().begin_read();
415        AccessPagedData {
416            writer: false,
417            time,
418            spd: self.clone(),
419        }
420    }
421
422    /// Get write access to the pages.
423    pub fn new_writer(self: &Arc<Self>) -> AccessPagedData{
424        AccessPagedData {
425            writer: true,
426            time: 0,
427            spd: self.clone(),
428        }
429    }
430
431    /// Wait until current commits have been written.
432    pub fn wait_complete(&self) {
433        self.ps.read().unwrap().wait_complete();
434    }
435}
436
437/// Access to shared paged data.
438pub struct AccessPagedData {
439    writer: bool,
440    time: u64,
441    /// Shared Page Data.
442    pub spd: Arc<SharedPagedData>,
443}
444
445impl AccessPagedData {
446    /// Construct access to a virtual read-only copy of the pages.
447    #[deprecated(note="use SharedPagedData::new_reader instead")]
448    pub fn new_reader(spd: Arc<SharedPagedData>) -> Self {
449        let time = spd.stash.lock().unwrap().begin_read();
450        AccessPagedData {
451            writer: false,
452            time,
453            spd,
454        }
455    }
456
457    /// Construct access to the pages.
458    #[deprecated(note="use SharedPagedData::new_writer instead")] 
459    pub fn new_writer(spd: Arc<SharedPagedData>) -> Self {
460        AccessPagedData {
461            writer: true,
462            time: 0,
463            spd,
464        }
465    }
466
467    /// Get locked guard of stash.
468    pub fn stash(&self) -> std::sync::MutexGuard<'_, Stash> {
469        self.spd.stash.lock().unwrap()
470    }
471
472    /// Get the Data for the specified page.
473    pub fn get_data(&self, lpnum: u64) -> Data {
474        // Get page info.
475        let pinfo = self.stash().get_pinfo(lpnum);
476
477        // Read the page data.
478        let (data, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
479
480        if loaded > 0 {
481            self.stash().delta((0, loaded), true, true);
482        }
483        data
484    }
485
486    /// Set the data of the specified page.
487    pub fn set_data(&self, lpnum: u64, data: Data) {
488        debug_assert!(self.writer);
489
490        // Get copy of current data.
491        let pinfo = self.stash().get_pinfo(lpnum);
492
493        // Read the page data.
494        let (old, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
495
496        // Update the stash ( ensures any readers will not attempt to read the file ).
497        {
498            let s = &mut *self.stash();
499            if loaded > 0 {
500                s.delta((0, loaded), true, false);
501            }
502            s.set(lpnum, old, data.clone());
503            s.trim_cache();
504        }
505
506        // Write data to underlying file.
507        if !data.is_empty() {
508            self.spd.ps.write().unwrap().set_page(lpnum, data);
509        } else {
510            self.spd.ps.write().unwrap().drop_page(lpnum);
511        }
512    }
513
514    /// Allocate a page.
515    pub fn alloc_page(&self) -> u64 {
516        debug_assert!(self.writer);
517        self.spd.ps.write().unwrap().new_page()
518    }
519
520    /// Free a page.
521    pub fn free_page(&self, lpnum: u64) {
522        self.set_data(lpnum, Data::default());
523    }
524
525    /// Is the underlying file new (so needs to be initialised ).
526    pub fn is_new(&self) -> bool {
527        self.writer && self.spd.ps.read().unwrap().is_new()
528    }
529
530    /// Check whether compressing a page is worthwhile.
531    pub fn compress(&self, size: usize, saving: usize) -> bool {
532        debug_assert!(self.writer);
533        self.spd.psi.compress(size, saving)
534    }
535
536    /// Commit changes to underlying file ( or rollback page allocations ).
537    pub fn save(&self, op: SaveOp) -> usize {
538        debug_assert!(self.writer);
539        match op {
540            SaveOp::Save => {
541                self.spd.ps.write().unwrap().save();
542                self.stash().end_write()
543            }
544            SaveOp::RollBack => {
545                // Note: rollback happens before any pages are updated.
546                // However page allocations need to be rolled back.
547                self.spd.ps.write().unwrap().rollback();
548                0
549            }
550        }
551    }
552
553    /// Renumber a page.
554    #[cfg(feature = "renumber")]
555    pub fn renumber_page(&self, lpnum: u64) -> u64 {
556        assert!(self.writer);
557        let data = self.get_data(lpnum);
558        self.stash().set(lpnum, data.clone(), Data::default());
559        let lpnum2 = self.spd.ps.write().unwrap().renumber(lpnum);
560        debug_assert!(
561            self.stash()
562                .get_pinfo(lpnum2)
563                .lock()
564                .unwrap()
565                .current
566                .is_none()
567        );
568        let old2 = self.get_data(lpnum2);
569        self.stash().set(lpnum2, old2, data);
570        lpnum2
571    }
572}
573
574impl Drop for AccessPagedData {
575    fn drop(&mut self) {
576        if !self.writer {
577            self.stash().end_read(self.time);
578        }
579    }
580}
581
582/// Memory limits.
583#[non_exhaustive]
584pub struct Limits {
585    /// Atomic file limits
586    pub af_lim: atom_file::Limits,
587    /// Block capacity
588    pub blk_cap: u64,
589    /// Page sizes
590    pub page_sizes: usize,
591    /// Largest division of page
592    pub max_div: usize,
593}
594
595impl Default for Limits {
596    fn default() -> Self {
597        Self {
598            af_lim: atom_file::Limits::default(),
599            blk_cap: 27720,
600            page_sizes: 7,
601            max_div: 12,
602        }
603    }
604}