Skip to main content

page_store/
lib.rs

1//! [`SharedPagedData`] manages numbered [`Data`] pages, which can be shared by multiple processes.
2//!
3//! [`PageStorageInfo`] has information about the page sizes available. The default maximum page size is 4612.
4//!
5//! A cache ( Stash ) of pages is maintained, including a history of each modified page ( where necessary ).
6//! 
7//! If the cache size exceeds a limit, the least used pages are discarded to reduce memory usage.
8//! 
9//!# Test example
10//!
11//! ```
12//!     use page_store::{SharedPagedData,SaveOp};
13//!     use atom_file::{MemFile,Data};
14//!     use std::sync::Arc;
15//!
16//!     let spd = SharedPagedData::new(MemFile::new());
17//!     println!( "Number of page sizes={}", spd.psi.sizes() );
18//!     println!( "Max page size={}", spd.psi.max_size_page() );
19//!     let w = spd.new_writer();
20//!     let pnum : u64 = w.alloc_page();
21//!     w.set_data( pnum, Arc::new( vec![1,2,3,4] ) );
22//!     w.save( SaveOp::Save );
23//!     let r = spd.new_reader();
24//!     let mut d : Data = w.get_data( pnum );
25//!     assert!( *d == vec![1,2,3,4] );
26//!     let md = Arc::make_mut(&mut d);
27//!     md[0] = 2;
28//!     w.set_data( pnum, d );
29//!     w.save( SaveOp::Save );
30//!     let d : Data = w.get_data( pnum );
31//!     assert!( *d == vec![2,2,3,4] );
32//!     let d : Data = r.get_data( pnum );
33//!     assert!( *d == vec![1,2,3,4] ); // Reader still sees "old" data.
34//! ```
35
36#![deny(missing_docs)]
37
38use atom_file::{Data, Storage};
39use heap::GHeap;
40pub use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
41use std::sync::{Arc, Mutex, RwLock};
42
43mod block;
44mod blockpagestg;
45mod dividedstg;
46mod heap;
47mod util;
48
49pub use blockpagestg::BlockPageStg;
50
51#[cfg(feature = "pstd")]
52use pstd::collections::BTreeMap;
53
54#[cfg(not(feature = "pstd"))]
55use std::collections::BTreeMap;
56
57/// Save or Rollback.
58#[derive(PartialEq, Eq, PartialOrd, Clone, Copy)]
59pub enum SaveOp {
60    /// Save changes.
61    Save,
62    /// Roll changes back.
63    RollBack,
64}
65
66/// Interface for page storage.
67pub trait PageStorage: Send + Sync {
68    /// Is the underlying storage new?
69    fn is_new(&self) -> bool;
70    /// Information about page sizes.
71    fn info(&self) -> Box<dyn PageStorageInfo>;
72    /// Make a new page, result is page number.
73    fn new_page(&mut self) -> u64;
74    /// Drop page number.
75    fn drop_page(&mut self, pn: u64);
76    /// Set contents of page.
77    fn set_page(&mut self, pn: u64, data: Data);
78    /// Get contents of page.
79    fn get_page(&self, pn: u64) -> Data;
80    /// Get page size (for repacking).
81    fn size(&self, pn: u64) -> usize;
82    /// Save pages to underlying storage.
83    fn save(&mut self);
84    /// Undo changes since last save ( but set_page/renumber cannot be undone, only new_page and drop_page can be undone ).
85    fn rollback(&mut self);
86    /// Wait until save is complete.
87    fn wait_complete(&self);
88    #[cfg(feature = "verify")]
89    /// Get set of free pages and number of pages ever allocated ( for VERIFY builtin function ).
90    fn get_free(&mut self) -> (crate::HashSet<u64>, u64);
91    #[cfg(feature = "renumber")]
92    /// Renumber page.
93    fn renumber(&mut self, pn: u64) -> u64;
94    #[cfg(feature = "renumber")]
95    /// Load free pages in preparation for page renumbering. Returns number of used pages or None if there are no free pages.
96    fn load_free_pages(&mut self) -> Option<u64>;
97    #[cfg(feature = "renumber")]
98    /// Final part of page renumber operation.
99    fn set_alloc_pn(&mut self, target: u64);
100}
101
102/// Information about page sizes.
103pub trait PageStorageInfo: Send + Sync {
104    /// Number of different page sizes.
105    fn sizes(&self) -> usize;
106    /// Size index for given page size.
107    fn index(&self, size: usize) -> usize;
108    /// Page size for ix ( 1-based ix must be <= sizes() ).
109    fn size(&self, ix: usize) -> usize;
110    /// Maximum size page.
111    fn max_size_page(&self) -> usize {
112        self.size(self.sizes())
113    }
114    /// Half size page.
115    fn half_size_page(&self) -> usize {
116        self.size(self.index(self.max_size_page() / 2 - 50))
117    }
118    /// Is it worth compressing a page of given size by saving.
119    fn compress(&self, size: usize, saving: usize) -> bool {
120        self.index(size - saving) < self.index(size)
121    }
122}
123
124type HX = u32; // Typical 8M cache will have 1K x 8KB pages, so 10 bits is typical, 32 should be plenty.
125type Heap = GHeap<u64, u64, HX>;
126
127/// ```Arc<Mutex<PageInfo>>```
128type PageInfoPtr = Arc<Mutex<PageInfo>>;
129
130/// Information for a page, including historic data.
131pub struct PageInfo {
132    /// Current data for the page( None implies it is stored in underlying file ).
133    pub current: Option<Data>,
134    /// Historic data for the page. Has data for page at specified time.
135    /// A copy is made prior to an update, so get looks forward from access time.
136    pub history: BTreeMap<u64, Data>,
137    /// How many times has the page been used.
138    pub usage: u64,
139    /// Heap index.
140    pub hx: HX,
141}
142
143impl PageInfo {
144    fn new() -> PageInfoPtr {
145        Arc::new(Mutex::new(PageInfo {
146            current: None,
147            history: BTreeMap::new(),
148            usage: 0,
149            hx: HX::MAX,
150        }))
151    }
152
153    /// Increase usage.
154    fn inc_usage(&mut self, lpnum: u64, ah: &mut Heap) {
155        self.usage += 1;
156        if self.hx == HX::MAX {
157            self.hx = ah.insert(lpnum, self.usage);
158        } else {
159            ah.modify(self.hx, self.usage);
160        }
161    }
162
163    /// Get the Data for the page, checking history if not a writer.
164    /// Reads Data from file if necessary.
165    /// Result is Data and size of loaded data ( cache delta ).
166    fn get_data(&mut self, lpnum: u64, a: &AccessPagedData) -> (Data, usize) {
167        if !a.writer
168            && let Some((_k, v)) = self.history.range(a.time..).next()
169        {
170            return (v.clone(), 0);
171        }
172
173        if let Some(p) = &self.current {
174            return (p.clone(), 0);
175        }
176
177        // Get data from page storage.
178        let ps = a.spd.ps.read().unwrap();
179        let data = ps.get_page(lpnum);
180        self.current = Some(data.clone());
181        let len = data.len();
182        (data, len)
183    }
184
185    /// Set the page data, updating the history using the specified time and old data.
186    /// Result is delta of length (old size, new size)
187    fn set_data(&mut self, time: u64, old: Data, data: Data, do_history: bool) -> (usize, usize) {
188        if do_history {
189            self.history.insert(time, old);
190        }
191        let old = if let Some(x) = &self.current {
192            x.len()
193        } else {
194            0
195        };
196        let new = data.len();
197        self.current = if new == 0 { None } else { Some(data) };
198        (old, new)
199    }
200
201    /// Trim entry for time t that no longer need to be retained, returning whether entry was retained.
202    /// start is start of range for which no readers exist.
203    fn trim(&mut self, t: u64, start: u64) -> bool {
204        let first = self.history_start(t);
205        if first >= start {
206            // There is no reader that can read copy for time t, so copy can be removed.
207            self.history.remove(&t);
208            false
209        } else {
210            true
211        }
212    }
213
214    /// Returns the earliest time that would return the page for the specified time.
215    fn history_start(&self, t: u64) -> u64 {
216        if let Some((k, _)) = self.history.range(..t).next_back() {
217            *k + 1
218        } else {
219            0
220        }
221    }
222}
223
224/// Central store of data.
225#[derive(Default)]
226pub struct Stash {
227    /// Write time - number of writes.
228    time: u64,
229    /// Page number -> page info.
230    pub pages: HashMap<u64, PageInfoPtr>,
231    /// Time -> reader count. Number of readers for given time.
232    rdrs: BTreeMap<u64, usize>,
233    /// Time -> set of page numbers. Page copies held for given time.
234    vers: BTreeMap<u64, HashSet<u64>>,
235    /// Total size of current pages.
236    pub total: i64, // Use i64 to avoid problems with overflow.
237    /// trim_cache reduces total to mem_limit (or below).
238    pub mem_limit: usize,
239    /// Tracks loaded page with smallest usage.
240    min: Heap,
241    /// Total number of page accesses.
242    pub read: u64,
243    /// Total number of misses ( data was not already loaded ).
244    pub miss: u64,
245}
246
247impl Stash {
248    /// Set the value of the specified page for the current time.
249    fn set(&mut self, lpnum: u64, old: Data, data: Data) {
250        let time = self.time;
251        let u = self.vers.entry(time).or_default();
252        let do_history = u.insert(lpnum);
253        let p = self.get_pinfo(lpnum);
254        let diff = p.lock().unwrap().set_data(time, old, data, do_history);
255        self.delta(diff, false, false);
256    }
257
258    /// Get the PageInfoPtr for the specified page and note the page as used.
259    fn get_pinfo(&mut self, lpnum: u64) -> PageInfoPtr {
260        let p = self
261            .pages
262            .entry(lpnum)
263            .or_insert_with(PageInfo::new)
264            .clone();
265        p.lock().unwrap().inc_usage(lpnum, &mut self.min);
266        self.read += 1;
267        p
268    }
269
270    /// Register that there is a client reading. The result is the current time.
271    fn begin_read(&mut self) -> u64 {
272        let time = self.time;
273        let n = self.rdrs.entry(time).or_insert(0);
274        *n += 1;
275        time
276    }
277
278    /// Register that the read at the specified time has ended. Stashed pages may be freed.
279    fn end_read(&mut self, time: u64) {
280        let n = self.rdrs.get_mut(&time).unwrap();
281        *n -= 1;
282        if *n == 0 {
283            self.rdrs.remove(&time);
284            self.trim(time);
285        }
286    }
287
288    /// Register that an update operation has completed. Time is incremented.
289    /// Stashed pages may be freed. Returns number of pages updated.
290    fn end_write(&mut self) -> usize {
291        let result = if let Some(u) = self.vers.get(&self.time) {
292            u.len()
293        } else {
294            0
295        };
296        let t = self.time;
297        self.time = t + 1;
298        self.trim(t);
299        result
300    }
301
302    /// Trim historic data that is no longer required.
303    fn trim(&mut self, time: u64) {
304        let (s, r) = (self.start(time), self.retain(time));
305        if s != r {
306            let mut empty = Vec::<u64>::new();
307            for (t, pl) in self.vers.range_mut(s..r) {
308                pl.retain(|pnum| {
309                    let p = self.pages.get(pnum).unwrap();
310                    p.lock().unwrap().trim(*t, s)
311                });
312                if pl.is_empty() {
313                    empty.push(*t);
314                }
315            }
316            for t in empty {
317                self.vers.remove(&t);
318            }
319        }
320    }
321
322    /// Calculate the start of the range of times for which there are no readers.
323    fn start(&self, time: u64) -> u64 {
324        if let Some((t, _n)) = self.rdrs.range(..time).next_back() {
325            1 + *t
326        } else {
327            0
328        }
329    }
330
331    /// Calculate the end of the range of times for which there are no readers.
332    fn retain(&self, time: u64) -> u64 {
333        if let Some((t, _n)) = self.rdrs.range(time..).next() {
334            *t
335        } else {
336            self.time
337        }
338    }
339
340    /// Adjust total.
341    fn delta(&mut self, d: (usize, usize), miss: bool, trim: bool) {
342        if miss {
343            self.miss += 1;
344        }
345        self.total += d.1 as i64 - d.0 as i64;
346        if trim {
347            self.trim_cache();
348        }
349    }
350
351    /// Trim cached data to configured limit.
352    pub fn trim_cache(&mut self) {
353        while self.total > self.mem_limit as i64 && self.min.len() > 0 {
354            let lpnum = self.min.pop();
355            let mut p = self.pages.get(&lpnum).unwrap().lock().unwrap();
356            p.hx = HX::MAX;
357            if let Some(data) = &p.current {
358                self.total -= data.len() as i64;
359                p.current = None;
360            }
361        }
362    }
363
364    /// Return the number of pages currently cached.
365    pub fn cached(&self) -> usize {
366        self.min.len() as usize
367    }
368}
369
370/// Allows pages to be shared to allow concurrent readers.
371pub struct SharedPagedData {
372    /// Permanent storage of pages.
373    pub ps: RwLock<Box<dyn PageStorage>>,
374    /// Stash of pages.
375    pub stash: Mutex<Stash>,
376    /// Info on page sizes.
377    pub psi: Box<dyn PageStorageInfo>,
378}
379
380impl SharedPagedData {
381    /*
382    #[cfg(feature = "compact")]
383    /// Construct default SharedPageData ( default depends on compact feature ).
384    pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
385        const EP_SIZE: usize = 1024; // Size of an extension page.
386        const EP_MAX: usize = 16; // Maximum number of extension pages.
387        const SP_SIZE: usize = (EP_MAX + 1) * 8; // =136. Starter page size.
388
389        Self::new_from_ps(Box::new(crate::compact::CompactFile::new(
390            stg, SP_SIZE, EP_SIZE,
391        )))
392    }
393
394    #[cfg(not(feature = "compact"))]
395    */
396
397    /// Construct default SharedPageData. stg is the backing storage.
398    pub fn new(stg: Box<dyn Storage>) -> Arc<Self> {
399        let limits = crate::Limits::default();
400        Self::new_from_ps(crate::blockpagestg::BlockPageStg::new(stg, &limits))
401    }
402
403    /// Construct SharedPageData based on specified PageStorage ( e.g. BlockPageStg )
404    pub fn new_from_ps(ps: Box<dyn PageStorage>) -> Arc<Self> {
405        // Set a default stash memory limit of 10 MB.
406        let stash = Stash {
407            mem_limit: 10 * 1024 * 1024,
408            ..Default::default()
409        };
410        let psi = ps.info();
411        Arc::new(Self {
412            stash: Mutex::new(stash),
413            ps: RwLock::new(ps),
414            psi,
415     
416        })
417    }
418
419    /// Get read access to a virtual read-only copy of the pages.
420    pub fn new_reader(self: &Arc<Self>) -> AccessPagedData
421    {
422        let time = self.stash.lock().unwrap().begin_read();
423        AccessPagedData {
424            writer: false,
425            time,
426            spd: self.clone(),
427        }
428    }
429
430    /// Get write access to the pages.
431    pub fn new_writer(self: &Arc<Self>) -> AccessPagedData{
432        AccessPagedData {
433            writer: true,
434            time: 0,
435            spd: self.clone(),
436        }
437    }
438
439    /// Wait until current commits have been written.
440    pub fn wait_complete(&self) {
441        self.ps.read().unwrap().wait_complete();
442    }
443}
444
445/// Access to shared paged data.
446pub struct AccessPagedData {
447    writer: bool,
448    time: u64,
449    /// Shared Page Data.
450    pub spd: Arc<SharedPagedData>,
451}
452
453impl AccessPagedData {
454    /// Construct access to a virtual read-only copy of the pages.
455    #[deprecated(note="use SharedPagedData::new_reader instead")]
456    pub fn new_reader(spd: Arc<SharedPagedData>) -> Self {
457        let time = spd.stash.lock().unwrap().begin_read();
458        AccessPagedData {
459            writer: false,
460            time,
461            spd,
462        }
463    }
464
465    /// Construct access to the pages.
466    #[deprecated(note="use SharedPagedData::new_writer instead")] 
467    pub fn new_writer(spd: Arc<SharedPagedData>) -> Self {
468        AccessPagedData {
469            writer: true,
470            time: 0,
471            spd,
472        }
473    }
474
475    /// Get locked guard of stash.
476    pub fn stash(&self) -> std::sync::MutexGuard<'_, Stash> {
477        self.spd.stash.lock().unwrap()
478    }
479
480    /// Get the Data for the specified page.
481    pub fn get_data(&self, lpnum: u64) -> Data {
482        // Get page info.
483        let pinfo = self.stash().get_pinfo(lpnum);
484
485        // Read the page data.
486        let (data, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
487
488        if loaded > 0 {
489            self.stash().delta((0, loaded), true, true);
490        }
491        data
492    }
493
494    /// Set the data of the specified page.
495    pub fn set_data(&self, lpnum: u64, data: Data) {
496        debug_assert!(self.writer);
497
498        // Get copy of current data.
499        let pinfo = self.stash().get_pinfo(lpnum);
500
501        // Read the page data.
502        let (old, loaded) = pinfo.lock().unwrap().get_data(lpnum, self);
503
504        // Update the stash ( ensures any readers will not attempt to read the file ).
505        {
506            let s = &mut *self.stash();
507            if loaded > 0 {
508                s.delta((0, loaded), true, false);
509            }
510            s.set(lpnum, old, data.clone());
511            s.trim_cache();
512        }
513
514        // Write data to underlying file.
515        if !data.is_empty() {
516            self.spd.ps.write().unwrap().set_page(lpnum, data);
517        } else {
518            self.spd.ps.write().unwrap().drop_page(lpnum);
519        }
520    }
521
522    /// Allocate a page.
523    pub fn alloc_page(&self) -> u64 {
524        debug_assert!(self.writer);
525        self.spd.ps.write().unwrap().new_page()
526    }
527
528    /// Free a page.
529    pub fn free_page(&self, lpnum: u64) {
530        self.set_data(lpnum, Data::default());
531    }
532
533    /// Is the underlying file new (so needs to be initialised ).
534    pub fn is_new(&self) -> bool {
535        self.writer && self.spd.ps.read().unwrap().is_new()
536    }
537
538    /// Check whether compressing a page is worthwhile.
539    pub fn compress(&self, size: usize, saving: usize) -> bool {
540        debug_assert!(self.writer);
541        self.spd.psi.compress(size, saving)
542    }
543
544    /// Commit changes to underlying file ( or rollback page allocations ).
545    pub fn save(&self, op: SaveOp) -> usize {
546        debug_assert!(self.writer);
547        match op {
548            SaveOp::Save => {
549                self.spd.ps.write().unwrap().save();
550                self.stash().end_write()
551            }
552            SaveOp::RollBack => {
553                // Note: rollback happens before any pages are updated.
554                // However page allocations need to be rolled back.
555                self.spd.ps.write().unwrap().rollback();
556                0
557            }
558        }
559    }
560
561    /// Renumber a page.
562    #[cfg(feature = "renumber")]
563    pub fn renumber_page(&self, lpnum: u64) -> u64 {
564        assert!(self.writer);
565        let data = self.get_data(lpnum);
566        self.stash().set(lpnum, data.clone(), Data::default());
567        let lpnum2 = self.spd.ps.write().unwrap().renumber(lpnum);
568        debug_assert!(
569            self.stash()
570                .get_pinfo(lpnum2)
571                .lock()
572                .unwrap()
573                .current
574                .is_none()
575        );
576        let old2 = self.get_data(lpnum2);
577        self.stash().set(lpnum2, old2, data);
578        lpnum2
579    }
580}
581
582impl Drop for AccessPagedData {
583    fn drop(&mut self) {
584        if !self.writer {
585            self.stash().end_read(self.time);
586        }
587    }
588}
589
590/// Memory limits.
591/// 
592/// Default block capacity is 27720  = 5 x 7 x 8 x 9 x 11, which is divisible by 6..=12.
593///
594/// Giving page sizes of 2302, 2512, 2764, 3072, 3437, 3952, 4612 net of 8 byte page header.
595#[non_exhaustive]
596pub struct Limits {
597    /// Atomic file limits
598    pub af_lim: atom_file::Limits,
599    /// Block capacity - default is 27720 = 5 x 7 x 9 x 11.
600    pub blk_cap: u64,
601    /// Page sizes - default is 7.
602    pub page_sizes: usize,
603    /// Largest division of page - default is 12.
604    pub max_div: usize,
605}
606
607impl Default for Limits {
608    fn default() -> Self {
609        Self {
610            af_lim: atom_file::Limits::default(),
611            blk_cap: 27720,
612            page_sizes: 7,
613            max_div: 12,
614        }
615    }
616}